Skip to content

Commit 352b3f6

Browse files
fix(search): prevent ES8 reindex loops and normalize legacy range queries (#17641)
Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent cd3ccef commit 352b3f6

14 files changed

Lines changed: 749 additions & 28 deletions

File tree

docs/how/updating-datahub.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,11 @@ Requirements:
170170

171171
### Known Issues
172172

173+
- **(Operations / Elasticsearch 8) Spurious mappings reindex on every Helm upgrade.** Deployments backed by **Elasticsearch 8.x** may trigger a full index mappings reindex on **every** system-update run (including each Helm upgrade), even when no authored mapping change was shipped. ES8 persists mappings differently than DataHub's authored definitions (for example injecting `type: object` on nested object fields), so the index builder detects spurious drift and schedules reindex work repeatedly. This is most likely when **`ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true`** (including Docker quickstart, which enables it for initial index builds). **Workaround:** Let the **first** system-update after install or upgrade finish so indices are created, then set **`ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=false`** on **`datahub-gms`** and **`datahub-upgrade`** before later Helm upgrades until a fixed release is available.
174+
173175
### Potential Downtime
174176

175-
- **System-update / Elasticsearch:** Upgrades that trigger reindexing, incremental index migration, or optional Elasticsearch ZDU (#16887) can run for an extended period depending on catalog size. Reindex duration was improved (#16949) and a perpetual ES8 reindex loop fix was included (#17402). Helm upgrades without `--atomic` may report failure while background reindex jobs continue — allow system-update to finish before retrying. See **Helm Notes** under the [0.10.0](#0100) section below for timeout guidance patterns.
177+
- **System-update / Elasticsearch:** Upgrades that trigger reindexing, incremental index migration, or optional Elasticsearch ZDU (#16887) can run for an extended period depending on catalog size. Reindex duration was improved (#16949). Elasticsearch 8 deployments with mappings reindex enabled may repeat unnecessary reindex work on every Helm upgrade — see **Known Issues** above. Helm upgrades without `--atomic` may report failure while background reindex jobs continue — allow system-update to finish before retrying. See **Helm Notes** under the [0.10.0](#0100) section below for timeout guidance patterns.
176178
- **First deploy after bootstrap moves:** Entity Types ingestion moved to system-update (see Breaking Changes). Expect additional system-update work on the first upgrade after this release.
177179
- **Aspect schema version sweep (#16930):** Large deployments may observe background migration activity during upgrade; plan maintenance windows accordingly.
178180

metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/client/shim/impl/Es8SearchClientShim.java

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import com.linkedin.metadata.search.elasticsearch.client.shim.builder.es8.Es8SemanticIndexSettingsBuilder;
9191
import com.linkedin.metadata.search.elasticsearch.client.shim.impl.v8.CustomQuery;
9292
import com.linkedin.metadata.search.elasticsearch.client.shim.impl.v8.Es8BulkListener;
93+
import com.linkedin.metadata.search.elasticsearch.client.shim.impl.v8.LegacyRangeQueryNormalizer;
9394
import com.linkedin.metadata.utils.elasticsearch.responses.GetIndexResponse;
9495
import com.linkedin.metadata.utils.elasticsearch.responses.RawResponse;
9596
import com.linkedin.metadata.utils.elasticsearch.shim.EmbeddingBatch;
@@ -105,6 +106,7 @@
105106
import java.util.Arrays;
106107
import java.util.Collections;
107108
import java.util.HashMap;
109+
import java.util.HashSet;
108110
import java.util.LinkedHashMap;
109111
import java.util.List;
110112
import java.util.Map;
@@ -242,6 +244,22 @@
242244
public class Es8SearchClientShim extends AbstractBulkProcessorShim<BulkIngester<?>>
243245
implements ElasticSearchClientShim<ElasticsearchClient> {
244246

247+
/**
248+
* ES8+ silently strips {@code doc_values: false} from {@code search_as_you_type} fields on
249+
* round-trip. Including it in the authored mapping creates a permanent diff against what the
250+
* cluster returns and triggers a reindex on every system update cycle, so we omit it here.
251+
*/
252+
public static final Map<String, String> PARTIAL_NGRAM_CONFIG =
253+
ImmutableMap.of(
254+
"type", "search_as_you_type",
255+
"max_shingle_size", "4");
256+
257+
/**
258+
* ES8 injects {@code type: custom} on custom analyzers when settings are persisted, but authored
259+
* V2 index settings omit {@code type} on analyzer definitions.
260+
*/
261+
public static final String INJECTED_CUSTOM_ANALYZER_TYPE = "custom";
262+
245263
@Getter private final ShimConfiguration shimConfiguration;
246264
private final SearchEngineType engineType;
247265
private final ElasticsearchClient client;
@@ -1424,22 +1442,54 @@ public SearchEngineType getEngineType() {
14241442
return engineType;
14251443
}
14261444

1427-
/**
1428-
* ES8+ silently strips {@code doc_values: false} from {@code search_as_you_type} fields on
1429-
* round-trip. Including it in the authored mapping creates a permanent diff against what the
1430-
* cluster returns and triggers a reindex on every system update cycle, so we omit it here.
1431-
*/
1432-
public static final Map<String, String> PARTIAL_NGRAM_CONFIG =
1433-
ImmutableMap.of(
1434-
"type", "search_as_you_type",
1435-
"max_shingle_size", "4");
1445+
/** ES8-specific index analysis settings comparison rules for reindex detection. */
1446+
public static final class IndexSettingsComparison {
1447+
private IndexSettingsComparison() {}
1448+
1449+
@Nonnull
1450+
public static Set<String> storedNamesForComparison(
1451+
@Nonnull Map<String, Object> targetSettings, @Nonnull Settings storedSettings) {
1452+
Set<String> names = new HashSet<>(storedSettings.names());
1453+
if (!targetSettings.containsKey("type") && names.remove("type")) {
1454+
String typeValue = storedSettings.get("type");
1455+
if (typeValue == null || !INJECTED_CUSTOM_ANALYZER_TYPE.equalsIgnoreCase(typeValue)) {
1456+
names.add("type");
1457+
}
1458+
}
1459+
return names;
1460+
}
1461+
1462+
public static boolean valuesEqual(@Nullable Object targetValue, @Nullable String storedValue) {
1463+
if (com.linkedin.metadata.utils.elasticsearch.IndexSettingsComparison.Strict.INSTANCE
1464+
.indexSettingValuesEqual(targetValue, storedValue)) {
1465+
return true;
1466+
}
1467+
if (targetValue == null || storedValue == null) {
1468+
return false;
1469+
}
1470+
return targetValue.toString().equalsIgnoreCase(storedValue);
1471+
}
1472+
}
14361473

14371474
@Nonnull
14381475
@Override
14391476
public Map<String, String> partialNgramConfig() {
14401477
return PARTIAL_NGRAM_CONFIG;
14411478
}
14421479

1480+
@Nonnull
1481+
@Override
1482+
public Set<String> indexSettingNamesForComparison(
1483+
@Nonnull Map<String, Object> targetSettings, @Nonnull Settings storedSettings) {
1484+
return IndexSettingsComparison.storedNamesForComparison(targetSettings, storedSettings);
1485+
}
1486+
1487+
@Override
1488+
public boolean indexSettingValuesEqual(
1489+
@Nullable Object targetValue, @Nullable String storedValue) {
1490+
return IndexSettingsComparison.valuesEqual(targetValue, storedValue);
1491+
}
1492+
14431493
@Nonnull
14441494
@Override
14451495
public String getEngineVersion() throws IOException {
@@ -1808,7 +1858,7 @@ private Query convertQuery(org.opensearch.index.query.QueryBuilder osQuery) {
18081858
if (osQuery == null) {
18091859
return null;
18101860
}
1811-
String jsonString = osQuery.toString();
1861+
String jsonString = normalizeQueryJson(osQuery.toString());
18121862
return Query.of(
18131863
q ->
18141864
q.withJson(
@@ -1817,14 +1867,22 @@ private Query convertQuery(org.opensearch.index.query.QueryBuilder osQuery) {
18171867
}
18181868

18191869
private Rescore convertRescore(RescorerBuilder<?> rescorerBuilder) {
1820-
String jsonString = rescorerBuilder.toString();
1870+
String jsonString = normalizeQueryJson(rescorerBuilder.toString());
18211871
return Rescore.of(
18221872
q ->
18231873
q.withJson(
18241874
jacksonJsonpMapper.jsonProvider().createParser(new StringReader(jsonString)),
18251875
jacksonJsonpMapper));
18261876
}
18271877

1878+
private String normalizeQueryJson(String jsonString) {
1879+
try {
1880+
return LegacyRangeQueryNormalizer.normalize(jsonString, objectMapper);
1881+
} catch (JsonProcessingException e) {
1882+
return jsonString;
1883+
}
1884+
}
1885+
18281886
private FieldSuggester convertSuggestion(SuggestionBuilder<?> suggestionBuilder) {
18291887
String jsonString = Strings.toString(MediaTypeRegistry.JSON, suggestionBuilder, true, true);
18301888
return FieldSuggester.of(
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.linkedin.metadata.search.elasticsearch.client.shim.impl.v8;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.JsonNode;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.fasterxml.jackson.databind.node.ObjectNode;
7+
import javax.annotation.Nonnull;
8+
9+
/**
10+
* Rewrites legacy OpenSearch {@code RangeQueryBuilder} JSON ({@code from}/{@code to} with {@code
11+
* include_lower}/{@code include_upper}) into ES 8-compatible bounds ({@code gte}/{@code gt}/{@code
12+
* lte}/{@code lt}). OpenSearch HLRC {@code QueryBuilder#toString()} still emits the legacy shape,
13+
* which Elasticsearch 8.18+ deprecates.
14+
*/
15+
public final class LegacyRangeQueryNormalizer {
16+
17+
private LegacyRangeQueryNormalizer() {}
18+
19+
@Nonnull
20+
public static String normalize(@Nonnull String queryJson, @Nonnull ObjectMapper objectMapper)
21+
throws JsonProcessingException {
22+
JsonNode root = objectMapper.readTree(queryJson);
23+
normalizeRangeNodes(root);
24+
return objectMapper.writeValueAsString(root);
25+
}
26+
27+
private static void normalizeRangeNodes(JsonNode node) {
28+
if (node == null) {
29+
return;
30+
}
31+
if (node.isObject()) {
32+
ObjectNode objectNode = (ObjectNode) node;
33+
if (objectNode.has("range") && objectNode.get("range").isObject()) {
34+
ObjectNode rangeNode = (ObjectNode) objectNode.get("range");
35+
rangeNode
36+
.properties()
37+
.forEach(
38+
entry -> {
39+
if (entry.getValue().isObject()) {
40+
normalizeRangeSpec((ObjectNode) entry.getValue());
41+
}
42+
});
43+
}
44+
objectNode.properties().forEach(entry -> normalizeRangeNodes(entry.getValue()));
45+
} else if (node.isArray()) {
46+
node.forEach(LegacyRangeQueryNormalizer::normalizeRangeNodes);
47+
}
48+
}
49+
50+
private static void normalizeRangeSpec(ObjectNode spec) {
51+
if (!spec.has("from") && !spec.has("to")) {
52+
return;
53+
}
54+
55+
JsonNode from = spec.get("from");
56+
JsonNode to = spec.get("to");
57+
boolean includeLower = !spec.has("include_lower") || spec.get("include_lower").asBoolean(true);
58+
boolean includeUpper = !spec.has("include_upper") || spec.get("include_upper").asBoolean(true);
59+
60+
if (from != null && !from.isNull()) {
61+
spec.set(includeLower ? "gte" : "gt", from);
62+
}
63+
if (to != null && !to.isNull()) {
64+
spec.set(includeUpper ? "lte" : "lt", to);
65+
}
66+
67+
spec.remove("from");
68+
spec.remove("to");
69+
spec.remove("include_lower");
70+
spec.remove("include_upper");
71+
}
72+
}

metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,8 @@ public ReindexConfig buildReindexState(
381381
structPropConfig.isEnabled()
382382
&& structPropConfig.isSystemUpdateEnabled()
383383
&& !copyStructuredPropertyMappings)
384-
.version(gitVersion.getVersion());
384+
.version(gitVersion.getVersion())
385+
.settingsComparisonShim(searchClient);
385386

386387
Map<String, Object> baseSettings = new HashMap<>(settings);
387388
baseSettings.put(NUMBER_OF_SHARDS, indexConfig.getNumShards());

metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ReindexConfig.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010
import com.google.common.collect.ImmutableList;
1111
import com.google.common.collect.MapDifference;
1212
import com.google.common.collect.Maps;
13+
import com.linkedin.metadata.utils.elasticsearch.IndexSettingsComparison;
14+
import com.linkedin.metadata.utils.elasticsearch.SearchClientShim;
1315
import com.linkedin.util.Pair;
1416
import java.util.*;
1517
import java.util.stream.Collectors;
1618
import java.util.stream.Stream;
19+
import javax.annotation.Nonnull;
1720
import lombok.Builder;
1821
import lombok.Getter;
1922
import lombok.experimental.Accessors;
@@ -143,6 +146,15 @@ private ReindexConfigBuilder hasRemovedStructuredProperty(boolean ignored) {
143146
return this;
144147
}
145148

149+
@Nonnull
150+
private IndexSettingsComparison settingsComparison = IndexSettingsComparison.Strict.INSTANCE;
151+
152+
public ReindexConfigBuilder settingsComparisonShim(
153+
@Nonnull SearchClientShim<?> settingsComparisonShim) {
154+
this.settingsComparison = settingsComparisonShim;
155+
return this;
156+
}
157+
146158
// ensure sorted
147159
public ReindexConfigBuilder currentMappings(Map<String, Object> currentMappings) {
148160
this.currentMappings = sortMap(currentMappings);
@@ -446,7 +458,7 @@ private boolean isAnalysisEqual() {
446458
// Compare analysis section
447459
Map<String, Object> newAnalysis = (Map<String, Object>) indexSettings.get("analysis");
448460
Settings oldAnalysis = super.currentSettings.getByPrefix("index.analysis.");
449-
return equalsGroup(newAnalysis, oldAnalysis);
461+
return equalsGroup(newAnalysis, oldAnalysis, super.settingsComparison);
450462
}
451463

452464
private boolean isSettingsEqual() {
@@ -504,7 +516,8 @@ private boolean isSettingsReindexRequired() {
504516
return (indexSettings.containsKey("analysis")
505517
&& !equalsGroup(
506518
(Map<String, Object>) indexSettings.get("analysis"),
507-
super.currentSettings.getByPrefix("index.analysis.")));
519+
super.currentSettings.getByPrefix("index.analysis."),
520+
super.settingsComparison));
508521
}
509522

510523
/**
@@ -690,8 +703,13 @@ private static Map<String, Object> removeKeys(
690703
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
691704
}
692705

693-
private static boolean equalsGroup(Map<String, Object> newSettings, Settings oldSettings) {
694-
if (!newSettings.keySet().equals(oldSettings.names())) {
706+
private static boolean equalsGroup(
707+
Map<String, Object> newSettings,
708+
Settings oldSettings,
709+
IndexSettingsComparison settingsComparison) {
710+
Set<String> oldNames =
711+
settingsComparison.indexSettingNamesForComparison(newSettings, oldSettings);
712+
if (!newSettings.keySet().equals(oldNames)) {
695713
return false;
696714
}
697715

@@ -703,7 +721,9 @@ private static boolean equalsGroup(Map<String, Object> newSettings, Settings old
703721
}
704722
if (newSettings.get(key) instanceof Map) {
705723
if (!equalsGroup(
706-
(Map<String, Object>) newSettings.get(key), oldSettings.getByPrefix(key + "."))) {
724+
(Map<String, Object>) newSettings.get(key),
725+
oldSettings.getByPrefix(key + "."),
726+
settingsComparison)) {
707727
return false;
708728
}
709729
} else if (newSettings.get(key) instanceof List) {
@@ -713,14 +733,7 @@ private static boolean equalsGroup(Map<String, Object> newSettings, Settings old
713733
} else {
714734
String oldValue = oldSettings.get(key);
715735
Object newValue = newSettings.get(key);
716-
// Handle null values properly
717-
if (newValue == null && oldValue == null) {
718-
continue;
719-
}
720-
if (newValue == null || oldValue == null) {
721-
return false;
722-
}
723-
if (!newValue.toString().equals(oldValue)) {
736+
if (!settingsComparison.indexSettingValuesEqual(newValue, oldValue)) {
724737
return false;
725738
}
726739
}

metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/client/shim/Es8SearchClientShimConversionTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
import static org.mockito.Mockito.verify;
66
import static org.mockito.Mockito.when;
77
import static org.testng.Assert.assertEquals;
8+
import static org.testng.Assert.assertFalse;
89
import static org.testng.Assert.assertNotNull;
910
import static org.testng.Assert.assertTrue;
1011

1112
import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
13+
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
1214
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
1315
import co.elastic.clients.elasticsearch.core.search.FieldSuggester;
1416
import co.elastic.clients.elasticsearch.indices.update_aliases.Action;
@@ -25,6 +27,7 @@
2527
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
2628
import org.opensearch.common.xcontent.XContentType;
2729
import org.opensearch.core.xcontent.XContentParser;
30+
import org.opensearch.index.query.QueryBuilder;
2831
import org.opensearch.index.query.QueryBuilders;
2932
import org.opensearch.search.builder.SearchSourceBuilder;
3033
import org.opensearch.search.suggest.Suggest;
@@ -353,6 +356,34 @@ private SearchResponse parseSearchResponse(String json) throws Exception {
353356
}
354357
}
355358

359+
/** convertQuery rewrites legacy range bounds before sending to Elasticsearch 8. */
360+
@Test
361+
public void testConvertQueryNormalizesLegacyRangeBounds() throws Exception {
362+
Method normalizeQueryJsonMethod =
363+
Es8SearchClientShim.class.getDeclaredMethod("normalizeQueryJson", String.class);
364+
normalizeQueryJsonMethod.setAccessible(true);
365+
366+
String legacy = QueryBuilders.rangeQuery("timestamp").gte(100L).lt(200L).toString();
367+
String normalized = (String) normalizeQueryJsonMethod.invoke(shim, legacy);
368+
369+
assertFalse(normalized.contains("\"from\""));
370+
assertFalse(normalized.contains("\"to\""));
371+
assertTrue(normalized.contains("\"gte\":100"));
372+
assertTrue(normalized.contains("\"lt\":200"));
373+
374+
Query result = invokeConvertQuery(QueryBuilders.rangeQuery("timestamp").gte(100L).lt(200L));
375+
assertNotNull(result);
376+
}
377+
378+
/** Helper method to invoke the private convertQuery method via reflection. */
379+
private Query invokeConvertQuery(QueryBuilder queryBuilder) throws Exception {
380+
Method convertQueryMethod =
381+
Es8SearchClientShim.class.getDeclaredMethod(
382+
"convertQuery", org.opensearch.index.query.QueryBuilder.class);
383+
convertQueryMethod.setAccessible(true);
384+
return (Query) convertQueryMethod.invoke(shim, queryBuilder);
385+
}
386+
356387
/** Helper method to invoke the private convertSuggestion method via reflection. */
357388
private FieldSuggester invokeConvertSuggestion(SuggestionBuilder<?> suggestionBuilder)
358389
throws Exception {

0 commit comments

Comments
 (0)