Skip to content

Commit ba7842d

Browse files
authored
fix(pit): reduce overhead of PIT creation (#15412)
1 parent 14093a3 commit ba7842d

File tree

10 files changed

+552
-169
lines changed

10 files changed

+552
-169
lines changed

metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ SearchResponse executeSearch(@Nonnull SearchRequest searchRequest) {
9999
return delegate.executeSearch(searchRequest);
100100
}
101101

102+
@Override
103+
public void cleanupPointInTime(String pitId) {
104+
delegate.cleanupPointInTime(pitId);
105+
}
106+
102107
@Override
103108
public void destroy() throws Exception {
104109
// Shutdown the delegate if it's a GraphQueryPITDAO

metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import lombok.extern.slf4j.Slf4j;
6161
import org.opensearch.action.search.SearchRequest;
6262
import org.opensearch.action.search.SearchResponse;
63+
import org.opensearch.common.unit.TimeValue;
6364
import org.opensearch.index.query.BoolQueryBuilder;
6465
import org.opensearch.index.query.QueryBuilder;
6566
import org.opensearch.index.query.QueryBuilders;
@@ -370,9 +371,22 @@ public RelatedEntitiesScrollResult scrollRelatedEntities(
370371
SearchHit[] searchHits = response.getHits().getHits();
371372
// Only return next scroll ID if there are more results, indicated by full size results
372373
String nextScrollId = null;
374+
String pitId = response.pointInTimeId();
373375
if (searchHits.length == count && searchHits.length > 0) {
374376
Object[] sort = searchHits[searchHits.length - 1].getSortValues();
375-
nextScrollId = new SearchAfterWrapper(sort, null, 0L).toScrollId();
377+
if (pitId != null && keepAlive == null) {
378+
throw new IllegalArgumentException("Should not set pitId without keepAlive");
379+
}
380+
long expirationTime =
381+
keepAlive == null
382+
? 0L
383+
: System.currentTimeMillis()
384+
+ TimeValue.parseTimeValue(keepAlive, "keepAlive").millis();
385+
nextScrollId = new SearchAfterWrapper(sort, pitId, expirationTime).toScrollId();
386+
}
387+
if (nextScrollId == null && pitId != null) {
388+
// Last scroll, we clean up the pitId assuming user has gone through all data
389+
graphReadDAO.cleanupPointInTime(pitId);
376390
}
377391

378392
return RelatedEntitiesScrollResult.builder()

metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryBaseDAO.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,4 +1664,9 @@ protected List<LineageRelationship> processSliceFutures(
16641664

16651665
return allRelationships;
16661666
}
1667+
1668+
@Override
1669+
public void cleanupPointInTime(String pitId) {
1670+
ESUtils.cleanupPointInTime(getClient(), pitId, "API Request");
1671+
}
16671672
}

metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryDAO.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,6 @@ SearchResponse getSearchResponse(
4747
@Nullable String scrollId,
4848
@Nullable String keepAlive,
4949
@Nullable Integer count);
50+
51+
void cleanupPointInTime(String pitId);
5052
}

metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphQueryPITDAO.java

Lines changed: 47 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -118,37 +118,53 @@ protected List<LineageRelationship> searchWithSlices(
118118
boolean allowPartialResults) {
119119

120120
// Create slice-based search requests
121-
List<CompletableFuture<List<LineageRelationship>>> sliceFutures = new ArrayList<>();
121+
String pitId = null;
122+
String keepAlive = config.getSearch().getGraph().getImpact().getKeepAlive();
123+
try {
124+
List<CompletableFuture<List<LineageRelationship>>> sliceFutures = new ArrayList<>();
125+
pitId =
126+
ESUtils.computePointInTime(
127+
null,
128+
keepAlive,
129+
client,
130+
opContext.getSearchContext().getIndexConvention().getIndexName(INDEX_NAME));
131+
final String tempPitId = pitId;
122132

123-
for (int sliceId = 0; sliceId < slices; sliceId++) {
124-
final int currentSliceId = sliceId;
133+
for (int sliceId = 0; sliceId < slices; sliceId++) {
134+
final int currentSliceId = sliceId;
125135

126-
CompletableFuture<List<LineageRelationship>> sliceFuture =
127-
CompletableFuture.supplyAsync(
128-
() -> {
129-
return searchSingleSliceWithPit(
130-
opContext,
131-
query,
132-
lineageGraphFilters,
133-
visitedEntities,
134-
viaEntities,
135-
numHops,
136-
remainingHops,
137-
existingPaths,
138-
maxRelations,
139-
defaultPageSize,
140-
currentSliceId,
141-
slices,
142-
remainingTime,
143-
entityUrns,
144-
allowPartialResults);
145-
},
146-
pitExecutor); // Use dedicated thread pool with CallerRunsPolicy for backpressure
147-
sliceFutures.add(sliceFuture);
148-
}
136+
CompletableFuture<List<LineageRelationship>> sliceFuture =
137+
CompletableFuture.supplyAsync(
138+
() -> {
139+
return searchSingleSliceWithPit(
140+
opContext,
141+
query,
142+
lineageGraphFilters,
143+
visitedEntities,
144+
viaEntities,
145+
numHops,
146+
remainingHops,
147+
existingPaths,
148+
maxRelations,
149+
defaultPageSize,
150+
currentSliceId,
151+
slices,
152+
remainingTime,
153+
entityUrns,
154+
allowPartialResults,
155+
tempPitId,
156+
keepAlive);
157+
},
158+
pitExecutor); // Use dedicated thread pool with CallerRunsPolicy for backpressure
159+
sliceFutures.add(sliceFuture);
160+
}
149161

150-
// Reuse the common slice coordination logic
151-
return processSliceFutures(sliceFutures, remainingTime, allowPartialResults);
162+
// Reuse the common slice coordination logic
163+
return processSliceFutures(sliceFutures, remainingTime, allowPartialResults);
164+
} finally {
165+
// Clean up PIT to prevent hitting the limit
166+
ESUtils.cleanupPointInTime(client, pitId, "lineage search: " + entityUrns);
167+
}
152168
}
153169

154170
/**
@@ -173,22 +189,14 @@ private List<LineageRelationship> searchSingleSliceWithPit(
173189
int totalSlices,
174190
long remainingTime,
175191
Set<Urn> entityUrns,
176-
boolean allowPartialResults) {
192+
boolean allowPartialResults,
193+
String pitId,
194+
String keepAlive) {
177195

178196
List<LineageRelationship> sliceRelationships = new ArrayList<>();
179-
String pitId = null;
180197
Object[] searchAfter = null;
181-
String keepAlive = config.getSearch().getGraph().getImpact().getKeepAlive();
182198

183199
try {
184-
// Create initial PIT using existing utility method
185-
pitId =
186-
ESUtils.computePointInTime(
187-
null,
188-
keepAlive,
189-
client,
190-
opContext.getSearchContext().getIndexConvention().getIndexName(INDEX_NAME));
191-
192200
// If maxRelations is -1 or 0, treat as unlimited (only bound by time)
193201
while (maxRelations <= 0 || sliceRelationships.size() < maxRelations) {
194202
// Check for thread interruption (from future.cancel(true))
@@ -293,9 +301,6 @@ private List<LineageRelationship> searchSingleSliceWithPit(
293301
} catch (Exception e) {
294302
log.error("Failed to execute PIT search for slice {}", sliceId, e);
295303
throw new RuntimeException("Failed to execute PIT search for slice " + sliceId, e);
296-
} finally {
297-
// Clean up PIT to prevent hitting the limit
298-
ESUtils.cleanupPointInTime(client, pitId, "slice " + sliceId);
299304
}
300305

301306
return sliceRelationships;

metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
import static com.linkedin.metadata.search.elasticsearch.query.request.SearchFieldConfig.KEYWORD_FIELDS;
1111
import static com.linkedin.metadata.search.elasticsearch.query.request.SearchFieldConfig.PATH_HIERARCHY_FIELDS;
1212
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
13+
import static org.opensearch.core.rest.RestStatus.TOO_MANY_REQUESTS;
1314

1415
import com.fasterxml.jackson.core.type.TypeReference;
1516
import com.google.common.collect.ImmutableList;
1617
import com.linkedin.data.schema.DataSchema;
1718
import com.linkedin.data.schema.MapDataSchema;
1819
import com.linkedin.data.schema.PathSpec;
1920
import com.linkedin.metadata.aspect.AspectRetriever;
21+
import com.linkedin.metadata.dao.throttle.APIThrottleException;
2022
import com.linkedin.metadata.models.EntitySpec;
2123
import com.linkedin.metadata.models.SearchableFieldSpec;
2224
import com.linkedin.metadata.models.StructuredPropertyUtils;
@@ -53,6 +55,7 @@
5355
import javax.annotation.Nullable;
5456
import lombok.extern.slf4j.Slf4j;
5557
import org.apache.commons.lang3.StringUtils;
58+
import org.opensearch.OpenSearchStatusException;
5659
import org.opensearch.action.search.CreatePitRequest;
5760
import org.opensearch.action.search.CreatePitResponse;
5861
import org.opensearch.action.search.DeletePitRequest;
@@ -1447,6 +1450,20 @@ public static QueryBuilder queryOptimize(QueryBuilder query, boolean considerSco
14471450
new CreatePitRequest(TimeValue.parseTimeValue(keepAlive, "keepAlive"), false, indexArray);
14481451
CreatePitResponse response = client.createPit(request, RequestOptions.DEFAULT);
14491452
return response.getId();
1453+
} catch (OpenSearchStatusException ose) {
1454+
if (TOO_MANY_REQUESTS.equals(ose.status())) {
1455+
APIThrottleException throttleException =
1456+
new APIThrottleException(
1457+
TimeValue.parseTimeValue(keepAlive, "keepAlive").millis(),
1458+
"Too many point in times created, retry after keep alive has expired.");
1459+
try {
1460+
throttleException.initCause(ose);
1461+
} catch (IllegalStateException | IllegalArgumentException e) {
1462+
// Do nothing, can't fill in cause
1463+
}
1464+
throw throttleException;
1465+
}
1466+
throw ose;
14501467
} catch (IOException e) {
14511468
log.warn("Failed to generate PointInTime Identifier:", e);
14521469
throw new IllegalStateException("Failed to generate PointInTime Identifier.", e);

metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.linkedin.metadata.graph.elastic;
22

3+
import static org.mockito.ArgumentMatchers.any;
4+
import static org.mockito.ArgumentMatchers.argThat;
35
import static org.mockito.ArgumentMatchers.eq;
46
import static org.mockito.ArgumentMatchers.isNull;
57
import static org.mockito.Mockito.mock;
8+
import static org.mockito.Mockito.times;
69
import static org.mockito.Mockito.verify;
710
import static org.mockito.Mockito.when;
811
import static org.testng.Assert.assertEquals;
@@ -24,6 +27,7 @@
2427
import com.linkedin.metadata.utils.metrics.MetricUtils;
2528
import io.datahubproject.metadata.context.OperationContext;
2629
import io.datahubproject.test.metadata.context.TestOperationContexts;
30+
import java.io.IOException;
2731
import java.util.ArrayList;
2832
import java.util.Arrays;
2933
import java.util.List;
@@ -523,4 +527,20 @@ public void testDestroyWithNullDelegate() throws Exception {
523527

524528
// Test passes if no exception is thrown
525529
}
530+
531+
@Test
532+
public void testCleanupPointInTime() throws IOException {
533+
SearchClientShim<?> mockClient = mock(SearchClientShim.class);
534+
535+
when(mockClient.getEngineType()).thenReturn(SearchClientShim.SearchEngineType.OPENSEARCH_2);
536+
ESGraphQueryDAO dao =
537+
new ESGraphQueryDAO(
538+
mockClient, mockGraphServiceConfig, mockElasticSearchConfig, mockMetricUtils);
539+
540+
String pitId = "test-pit-id";
541+
dao.cleanupPointInTime(pitId);
542+
543+
// Verify ESUtils.cleanupPointInTime was called with correct parameters
544+
verify(mockClient, times(1)).deletePit(argThat(req -> req.getPitIds().contains(pitId)), any());
545+
}
526546
}

0 commit comments

Comments
 (0)