Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -65,6 +65,11 @@
@Slf4j
public class EbeanLocalAccess<URN extends Urn> implements IEbeanLocalAccess<URN> {
public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";

// Maximum number of URNs per SQL IN clause. Keeps queries safe for MySQL query planner and packet limits.
// Aspect columns are always all selected in each chunk — only URN count is chunked.
static final int MAX_URNS_PER_QUERY = 100;

private final EbeanServer _server;
private final Class<URN> _urnClass;
private final String _entityType;
Expand Down Expand Up @@ -300,13 +305,18 @@ public <ASPECT_UNION extends RecordTemplate> int batchUpsert(
}

/**
* Construct and execute a SQL statement as follows.
* SELECT urn, aspect1, lastmodifiedon, lastmodifiedby FROM metadata_entity_foo WHERE JSON_EXTRACT(aspect1, '$.gma_deleted') IS NULL
* AND urn IN ('urn:1', 'urn:2', 'urn:3')
* Fetch aspects for the given keys in a single SQL query. Generates:
* SELECT urn, a_col1, a_col2, ..., lastmodifiedon, lastmodifiedby, createdfor
* FROM metadata_entity_foo WHERE urn IN (...) [AND deleted_ts IS NULL]
*
* <p>Aspect-level soft-deletes (gma_deleted) are NOT filtered in SQL — they are returned as marker rows
* and must be filtered by callers (e.g., EbeanLocalDAO.toRecordTemplate checks isSoftDeletedAspect).
* The includeSoftDeleted flag controls only asset-level deletion (deleted_ts column).
*
* @param aspectKeys a List of keys (urn, aspect pairings) to query for
* @param keysCount number of keys to query
* @param position position of the key to start from
* @param includeSoftDeleted whether to include soft deleted aspect in the query
* @param includeSoftDeleted whether to include asset-level soft deleted entities (deleted_ts)
* @param isTestMode whether the operation is in test mode or not
*/
@Override
Expand All @@ -315,31 +325,38 @@ public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(
boolean includeSoftDeleted, boolean isTestMode) {

final int end = Math.min(aspectKeys.size(), position + keysCount);
final Map<Class<ASPECT>, Set<Urn>> keysToQueryMap = new HashMap<>();

// Collect all valid aspect columns and all URNs, validating column existence per aspect class.
// Use LinkedHashSet for allUrns so IN-clause order is deterministic across runs/JVMs.
final Set<Urn> allUrns = new LinkedHashSet<>();
final Map<String, Class<ASPECT>> columnToAspectClassMap = new LinkedHashMap<>();
for (int index = position; index < end; index++) {
final Urn entityUrn = aspectKeys.get(index).getUrn();
final Class<ASPECT> aspectClass = (Class<ASPECT>) aspectKeys.get(index).getAspectClass();
if (validator.columnExists(isTestMode ? getTestTableName(entityUrn) : getTableName(entityUrn),
getAspectColumnName(entityUrn.getEntityType(), aspectClass))) {
keysToQueryMap.computeIfAbsent(aspectClass, unused -> new HashSet<>()).add(entityUrn);
final String tableName = isTestMode ? getTestTableName(entityUrn) : getTableName(entityUrn);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to make sure it's the same table name?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually here we don't support different urn type, but seems like we do support that previously?

final String columnName = getAspectColumnName(entityUrn.getEntityType(), aspectClass);
if (validator.columnExists(tableName, columnName)) {
columnToAspectClassMap.putIfAbsent(columnName, aspectClass);
allUrns.add(entityUrn);
}
}

// each statement is for a single aspect class
Map<String, Class<ASPECT>> selectStatements = keysToQueryMap.entrySet()
.stream()
.collect(Collectors.toMap(
entry -> SQLStatementUtils.createAspectReadSql(entry.getKey(), entry.getValue(), includeSoftDeleted,
isTestMode), entry -> entry.getKey()));

// consolidate/join the results
final Map<SqlRow, Class<ASPECT>> sqlRows = new LinkedHashMap<>();
for (Map.Entry<String, Class<ASPECT>> entry : selectStatements.entrySet()) {
for (SqlRow sqlRow : _server.createSqlQuery(entry.getKey()).findList()) {
sqlRows.put(sqlRow, entry.getValue());
}
if (columnToAspectClassMap.isEmpty()) {
return Collections.emptyList();
}

// gma_deleted is NOT filtered in SQL — handled per-column in Java by readMultiAspectSqlRows.
// Chunk by URN count to keep IN clause size safe for MySQL. Single-URN-batch case is handled
// by the same loop with a single iteration.
final List<EbeanMetadataAspect> results = new ArrayList<>();
final List<Urn> urnList = new ArrayList<>(allUrns);
for (int i = 0; i < urnList.size(); i += MAX_URNS_PER_QUERY) {
final Set<Urn> chunk = new LinkedHashSet<>(urnList.subList(i, Math.min(i + MAX_URNS_PER_QUERY, urnList.size())));
final String sql = SQLStatementUtils.createMultiAspectReadSql(
columnToAspectClassMap.keySet(), chunk, includeSoftDeleted, isTestMode);
results.addAll(EBeanDAOUtils.readMultiAspectSqlRows(_server.createSqlQuery(sql).findList(), columnToAspectClassMap));
}
return EBeanDAOUtils.readSqlRows(sqlRows);
return results;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,16 @@ private List<EbeanMetadataAspect> batchGet(@Nonnull Set<AspectKey<URN, ? extends
final List<EbeanMetadataAspect> oneStatementResult = batchGetHelper(new ArrayList<>(keys), keysCount, position);
finalResult.addAll(oneStatementResult);
}

// For DUAL_SCHEMA: compare the accumulated old-schema results (paged) against a single
// unpaged new-schema fetch. Doing the comparison here (rather than per-page in batchGetHelper)
// ensures both sides cover the same key set when keys.size() > keysCount.
boolean nonLatestVersionFlag = keys.stream().anyMatch(key -> key.getVersion() != LATEST_VERSION);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this makes the code super hard to review and maintain, can we simply use different method for old and new schema, or even, is old schema still alive? can we just remove those comparing code.

if (!nonLatestVersionFlag && _schemaConfig == SchemaConfig.DUAL_SCHEMA && _localAccess != null) {
final List<EbeanMetadataAspect> resultsNewSchema =
_localAccess.batchGetUnion(new ArrayList<>(keys), keys.size(), 0, false, false);
EBeanDAOUtils.compareResults(finalResult, resultsNewSchema, "batchGet");
}
return finalResult;
}

Expand Down Expand Up @@ -1383,16 +1393,19 @@ List<EbeanMetadataAspect> batchGetHelper(@Nonnull List<AspectKey<URN, ? extends
}

if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) {
return _localAccess.batchGetUnion(keys, keysCount, position, false, false);
// For new schema, all aspects are columns in a single SELECT — no need for keysCount pagination.
// The first call (position=0) fetches everything; subsequent pages return empty to avoid duplicates.
if (position > 0) {
return Collections.emptyList();
}
return _localAccess.batchGetUnion(keys, keys.size(), 0, false, false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to me is that we are forcing to not use pagination. then should we just simply remove that

}

if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) {
// Compare results from both new and old schemas
final List<EbeanMetadataAspect> resultsOldSchema = batchGetUnion(keys, keysCount, position);
final List<EbeanMetadataAspect> resultsNewSchema =
_localAccess.batchGetUnion(keys, keysCount, position, false, false);
EBeanDAOUtils.compareResults(resultsOldSchema, resultsNewSchema, "batchGet");
return resultsOldSchema;
// Return paginated old-schema results per page; the cross-schema comparison is hoisted to
// batchGet() so both sides cover the same key set (avoiding spurious mismatches when
// keys.size() > keysCount).
return batchGetUnion(keys, keysCount, position);
}

log.error("Please check that the SchemaConfig supplied to EbeanLocalDAO constructor is valid.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,27 @@ <ASPECT_UNION extends RecordTemplate> int batchUpsert(@Nonnull URN urn,
@Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode);

/**
* Get read aspects from entity table. This a new schema implementation for batchGetUnion() in {@link EbeanLocalDAO}
* Fetch aspects from the entity table using a single multi-column SELECT per URN chunk.
* This is the new-schema implementation for batchGetUnion() in {@link EbeanLocalDAO}.
*
* <p>Aspect-level soft-deletes (gma_deleted) are always returned as marker rows — callers must
* filter them (e.g., via {@code EbeanLocalDAO.toRecordTemplate} which checks {@code isSoftDeletedAspect}).
* The {@code includeSoftDeleted} flag controls only asset-level deletion (deleted_ts column).
*
* <p>URNs are chunked internally (max {@link EbeanLocalAccess#MAX_URNS_PER_QUERY} per SQL IN clause).
*
* <p><b>Cross-product semantics:</b> the input {@code keys} are reduced to a unique set of URNs and a
* unique set of aspect columns. The query selects ALL collected aspect columns for ALL collected URNs.
* If a caller passes heterogeneous pairs like {@code [(urn1, AspectFoo), (urn2, AspectBar)]} and both
* URNs have both columns populated, this returns 4 {@link EbeanMetadataAspect} entries — not the 2
* requested. The internal callers in {@link EbeanLocalDAO} re-filter via {@code matchKeys}, but external
* callers requesting heterogeneous (urn, aspect) pairs must filter to the exact requested pairs themselves.
*
* @param keys {@link AspectKey} to retrieve aspect metadata
* @param keysCount pagination key count limit
* @param position starting position of pagination
* @param includeSoftDeleted include soft deleted aspects, default false
* @param keysCount slice window: the method processes {@code aspectKeys[position .. min(size, position+keysCount))}
* when collecting URNs and aspect columns. Callers that want to process all keys pass {@code keys.size()}.
* @param position starting index for the slice window (callers that want to process all keys pass {@code 0})
* @param includeSoftDeleted whether to include asset-level soft deleted entities (deleted_ts)
* @param isTestMode whether the operation is in test mode or not
* @param <ASPECT> metadata aspect value
* @return a list of {@link EbeanMetadataAspect} as get response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,30 @@ public static <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> readSqlR
}).collect(Collectors.toList());
}

/**
* Read {@link SqlRow} list from a multi-aspect query into a {@link EbeanMetadataAspect} list.
* Unlike {@link #readSqlRows(Map)}, this handles rows that contain MULTIPLE aspect columns.
* Each non-null aspect column in a row produces a separate EbeanMetadataAspect in the result.
*
* <p>Soft-deleted aspects (gma_deleted JSON marker) are NOT filtered here — they are returned
* with their marker metadata and soft-delete PrimaryKey populated by {@link #readSqlRow}.
* Callers must filter via {@link #isSoftDeletedMetadata} if soft-deleted aspects should be excluded.
*
* @param sqlRows list of {@link SqlRow} from the multi-aspect query
* @param columnToAspectClassMap mapping of column name to aspect class for each requested aspect
* @param <ASPECT> aspect class type
* @return list of {@link EbeanMetadataAspect}
*/
public static <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> readMultiAspectSqlRows(
@Nonnull List<SqlRow> sqlRows,
@Nonnull Map<String, Class<ASPECT>> columnToAspectClassMap) {
return sqlRows.stream().flatMap(sqlRow ->
columnToAspectClassMap.entrySet().stream()
.filter(entry -> sqlRow.get(entry.getKey()) != null)
.map(entry -> readSqlRow(sqlRow, entry.getValue()))
).collect(Collectors.toList());
}

/**
* Parse a list of {@link SqlRow} results from an entity table into a map of
* URN to {@link EntityDeletionInfo}. Each row must contain urn, deleted_ts, and the Status aspect column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,57 @@ public static <ASPECT extends RecordTemplate> String createAspectReadSql(@Nonnul
return stringBuilder.toString();
}

/**
* Create a single SQL statement to read multiple aspect columns for a set of URNs.
* Unlike {@link #createAspectReadSql} which generates one SQL per aspect class, this generates a single query
* selecting all requested aspect columns at once. The gma_deleted check is NOT included in SQL — it must be
* handled in Java by the caller (checking each aspect column individually after retrieval).
*
* @param aspectColumnNames the set of aspect column names to select (e.g., "a_status", "a_ownership")
* @param urns the set of URNs to query
* @param includeSoftDeleted if true, omits deleted_ts IS NULL filter and includes deleted_ts in SELECT
* @param isTestMode whether to use the test table
* @return SQL string for the multi-aspect read
*/
public static String createMultiAspectReadSql(@Nonnull Set<String> aspectColumnNames,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how too we avoid sql injection here?

@Nonnull Set<Urn> urns, boolean includeSoftDeleted, boolean isTestMode) {
if (urns.isEmpty()) {
throw new IllegalArgumentException("Need at least 1 urn to query.");
}
if (aspectColumnNames.isEmpty()) {
throw new IllegalArgumentException("Need at least 1 aspect column to query.");
}

final Urn firstUrn = urns.iterator().next();
final String firstEntityType = firstUrn.getEntityType();
if (urns.stream().anyMatch(u -> !u.getEntityType().equals(firstEntityType))) {
throw new IllegalArgumentException("All URNs must belong to the same entity type");
}
final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn);

// Build column list: urn, <aspect columns>, lastmodifiedon, lastmodifiedby, createdfor [, deleted_ts]
StringBuilder sb = new StringBuilder("SELECT urn, ");
sb.append(String.join(", ", aspectColumnNames));
sb.append(", lastmodifiedon, lastmodifiedby, createdfor");
if (includeSoftDeleted) {
sb.append(", deleted_ts");
}
sb.append(" FROM ").append(tableName);

// WHERE urn IN (...)
String urnList = urns.stream()
.map(urn -> "'" + escapeReservedCharInUrn(urn.toString()) + "'")
.collect(Collectors.joining(", "));
sb.append(" WHERE urn IN (").append(urnList).append(")");

// Add deleted_ts filter when not including soft-deleted entities
if (!includeSoftDeleted) {
sb.append(" AND ").append(DELETED_TS_IS_NULL_CHECK);
}

return sb.toString();
}

/**
* List all the aspect record (0 or 1) for a given entity urn and aspect type.
* @param aspectClass aspect type
Expand Down
Loading
Loading