Skip to content

Commit 31ed631

Browse files
authored
fix(domains): cross-check OpenSearch results against primary store in hasChildDomains() (#17732)
DeleteDomainResolver blocked parent deletion even after all children were removed. hasChildDomains() queried OpenSearch (eventually consistent) -- recently-deleted children could still appear there while already gone from MySQL. Cross-check each OpenSearch candidate via filterExistingUrns(), a batched MySQL read. Discard stale hits; only block if MySQL confirms at least one child still exists. Fall back to blocking when OpenSearch reports more candidates than the fetched page, to avoid a false allow on large hierarchies. Smoke test reproduces the race by firing child and parent deletes with no consistency sleep between them.
1 parent 2880231 commit 31ed631

3 files changed

Lines changed: 220 additions & 14 deletions

File tree

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DomainUtils.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,36 @@ public static boolean hasChildDomains(
201201
@Nonnull final EntityClient entityClient)
202202
throws RemoteInvocationException {
203203
Filter parentDomainFilter = buildParentDomainFilter(domainUrn);
204-
// Search for entities matching parent domain
205-
// Limit count to 1 for existence check
204+
// Fetch candidate child domains from OpenSearch. OpenSearch is eventually
205+
// consistent -- a recently-deleted child may still appear here.
206+
final int pageSize = 200;
206207
final SearchResult searchResult =
207208
entityClient.filter(
208-
context.getOperationContext(), DOMAIN_ENTITY_NAME, parentDomainFilter, null, 0, 1);
209-
return (searchResult.getNumEntities() > 0);
209+
context.getOperationContext(),
210+
DOMAIN_ENTITY_NAME,
211+
parentDomainFilter,
212+
null,
213+
0,
214+
pageSize);
215+
// Cross-check each candidate against the primary store (MySQL) which is
216+
// strongly consistent. Discard stale OpenSearch entries whose deletions
217+
// have not yet been indexed. Only block deletion if at least one child
218+
// still exists in the primary store.
219+
final Set<Urn> candidateUrns =
220+
searchResult.getEntities().stream()
221+
.map(SearchEntity::getEntity)
222+
.collect(Collectors.toSet());
223+
if (candidateUrns.isEmpty()) {
224+
return false;
225+
}
226+
if (!entityClient.filterExistingUrns(context.getOperationContext(), candidateUrns).isEmpty()) {
227+
return true;
228+
}
229+
// If OpenSearch reported more candidates than we fetched in this page, we
230+
// cannot confirm the domain is childless without checking the remaining
231+
// entries. Fall back to treating the domain as having children to prevent
232+
// accidental deletion when the index is stale on a large hierarchy.
233+
return searchResult.getNumEntities() > searchResult.getEntities().size();
210234
}
211235

212236
private static Map<Urn, EntityResponse> getDomainsByNameAndParent(

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/domain/DeleteDomainResolverTest.java

Lines changed: 97 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,39 +5,44 @@
55
import static org.testng.Assert.*;
66

77
import com.linkedin.common.urn.Urn;
8+
import com.linkedin.common.urn.UrnUtils;
89
import com.linkedin.datahub.graphql.QueryContext;
910
import com.linkedin.entity.client.EntityClient;
11+
import com.linkedin.metadata.search.SearchEntity;
12+
import com.linkedin.metadata.search.SearchEntityArray;
1013
import com.linkedin.metadata.search.SearchResult;
1114
import graphql.schema.DataFetchingEnvironment;
15+
import java.util.Collections;
16+
import java.util.Set;
1217
import java.util.concurrent.CompletionException;
1318
import org.mockito.Mockito;
1419
import org.testng.annotations.Test;
1520

1621
public class DeleteDomainResolverTest {
1722

1823
private static final String TEST_URN = "urn:li:domain:test-id";
24+
private static final String CHILD_URN = "urn:li:domain:child-id";
1925

2026
@Test
2127
public void testGetSuccess() throws Exception {
2228
EntityClient mockClient = Mockito.mock(EntityClient.class);
2329
DeleteDomainResolver resolver = new DeleteDomainResolver(mockClient);
2430

25-
// Execute resolver
2631
QueryContext mockContext = getMockAllowContext();
2732
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
2833
Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_URN);
2934
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
3035

31-
// Domain has 0 child domains
36+
// Domain has 0 child domains -- early exit before filterExistingUrns.
3237
Mockito.when(
3338
mockClient.filter(
3439
any(),
3540
Mockito.eq("domain"),
3641
Mockito.any(),
3742
Mockito.any(),
3843
Mockito.eq(0),
39-
Mockito.eq(1)))
40-
.thenReturn(new SearchResult().setNumEntities(0));
44+
Mockito.eq(200)))
45+
.thenReturn(new SearchResult().setNumEntities(0).setEntities(new SearchEntityArray()));
4146

4247
assertTrue(resolver.get(mockEnv).get());
4348

@@ -50,35 +55,117 @@ public void testDeleteWithChildDomains() throws Exception {
5055
EntityClient mockClient = Mockito.mock(EntityClient.class);
5156
DeleteDomainResolver resolver = new DeleteDomainResolver(mockClient);
5257

53-
// Execute resolver
5458
QueryContext mockContext = getMockAllowContext();
5559
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
5660
Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_URN);
5761
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
5862

59-
// Domain has child domains
63+
// OpenSearch returns one child candidate.
64+
Urn childUrn = UrnUtils.getUrn(CHILD_URN);
65+
SearchEntity childEntity = new SearchEntity().setEntity(childUrn);
6066
Mockito.when(
6167
mockClient.filter(
6268
any(),
6369
Mockito.eq("domain"),
6470
Mockito.any(),
6571
Mockito.any(),
6672
Mockito.eq(0),
67-
Mockito.eq(1)))
68-
.thenReturn(new SearchResult().setNumEntities(1));
73+
Mockito.eq(200)))
74+
.thenReturn(
75+
new SearchResult().setNumEntities(1).setEntities(new SearchEntityArray(childEntity)));
76+
77+
// Primary store (MySQL) confirms the child still exists.
78+
Mockito.when(mockClient.filterExistingUrns(any(), Mockito.eq(Set.of(childUrn))))
79+
.thenReturn(Set.of(childUrn));
6980

7081
assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
7182

7283
Mockito.verify(mockClient, Mockito.times(0)).deleteEntity(any(), Mockito.any());
7384
}
7485

86+
@Test
87+
public void testDeleteBlockedWhenPagedCandidatesAllStaleButMoreExist() throws Exception {
88+
// When OpenSearch reports more total candidates than fit in one page and all
89+
// fetched candidates are stale in MySQL, deletion must still be blocked.
90+
// Without the fallback check (numEntities > entities.size()), the code would
91+
// incorrectly return false -- potentially allowing deletion of a domain that
92+
// still has real children in the un-fetched remainder of the OpenSearch result.
93+
EntityClient mockClient = Mockito.mock(EntityClient.class);
94+
DeleteDomainResolver resolver = new DeleteDomainResolver(mockClient);
95+
96+
QueryContext mockContext = getMockAllowContext();
97+
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
98+
Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_URN);
99+
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
100+
101+
// OpenSearch reports 300 total but only returns one entry in this page,
102+
// simulating the case where numEntities > the fetched page size.
103+
Urn childUrn = UrnUtils.getUrn(CHILD_URN);
104+
Mockito.when(
105+
mockClient.filter(
106+
any(),
107+
Mockito.eq("domain"),
108+
Mockito.any(),
109+
Mockito.any(),
110+
Mockito.eq(0),
111+
Mockito.eq(200)))
112+
.thenReturn(
113+
new SearchResult()
114+
.setNumEntities(300)
115+
.setEntities(new SearchEntityArray(new SearchEntity().setEntity(childUrn))));
116+
117+
// The fetched candidate is stale in MySQL.
118+
Mockito.when(mockClient.filterExistingUrns(any(), any())).thenReturn(Collections.emptySet());
119+
120+
// Must still block deletion: we cannot confirm childlessness from one page.
121+
assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join());
122+
Mockito.verify(mockClient, Mockito.times(0)).deleteEntity(any(), Mockito.any());
123+
}
124+
125+
@Test
126+
public void testDeleteWithStaleChildDomains() throws Exception {
127+
// Regression test for the OpenSearch eventual-consistency race condition:
128+
// OpenSearch still shows a child that was just deleted from MySQL.
129+
// hasChildDomains() must allow the parent delete to proceed once the
130+
// primary store (MySQL) confirms no child actually exists.
131+
EntityClient mockClient = Mockito.mock(EntityClient.class);
132+
DeleteDomainResolver resolver = new DeleteDomainResolver(mockClient);
133+
134+
QueryContext mockContext = getMockAllowContext();
135+
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
136+
Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_URN);
137+
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);
138+
139+
// OpenSearch returns a stale child candidate.
140+
Urn childUrn = UrnUtils.getUrn(CHILD_URN);
141+
SearchEntity childEntity = new SearchEntity().setEntity(childUrn);
142+
Mockito.when(
143+
mockClient.filter(
144+
any(),
145+
Mockito.eq("domain"),
146+
Mockito.any(),
147+
Mockito.any(),
148+
Mockito.eq(0),
149+
Mockito.eq(200)))
150+
.thenReturn(
151+
new SearchResult().setNumEntities(1).setEntities(new SearchEntityArray(childEntity)));
152+
153+
// Primary store (MySQL) confirms the child no longer exists -- stale OpenSearch hit.
154+
Mockito.when(mockClient.filterExistingUrns(any(), Mockito.eq(Set.of(childUrn))))
155+
.thenReturn(Collections.emptySet());
156+
157+
// Deletion should succeed because the only OpenSearch candidate is stale.
158+
assertTrue(resolver.get(mockEnv).get());
159+
160+
Mockito.verify(mockClient, Mockito.times(1))
161+
.deleteEntity(any(), Mockito.eq(Urn.createFromString(TEST_URN)));
162+
}
163+
75164
@Test
76165
public void testGetUnauthorized() throws Exception {
77-
// Create resolver
78166
EntityClient mockClient = Mockito.mock(EntityClient.class);
79167
DeleteDomainResolver resolver = new DeleteDomainResolver(mockClient);
80168

81-
// Execute resolver
82169
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
83170
Mockito.when(mockEnv.getArgument(Mockito.eq("urn"))).thenReturn(TEST_URN);
84171
QueryContext mockContext = getMockDenyContext();

smoke-test/tests/domains/domains_test.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import uuid
23
from typing import Any, Dict
34

45
import pytest
@@ -169,3 +170,97 @@ def test_set_unset_domain(auth_session, ingest_cleanup_data):
169170
res_data["data"]["dataset"]["domain"]["domain"]["properties"]["name"]
170171
== "Engineering"
171172
)
173+
174+
175+
_CREATE_DOMAIN_MUTATION = """
176+
mutation createDomain($input: CreateDomainInput!) {
177+
createDomain(input: $input)
178+
}
179+
"""
180+
181+
_DELETE_DOMAIN_MUTATION = """
182+
mutation deleteDomain($urn: String!) {
183+
deleteDomain(urn: $urn)
184+
}
185+
"""
186+
187+
188+
def test_delete_parent_domain_immediately_after_child_deletion(auth_session):
189+
"""
190+
A parent domain whose only child has just been deleted should be
191+
immediately deletable -- no sleep or page-refresh should be required.
192+
193+
Demonstrates a race condition in DomainUtils.hasChildDomains(): it
194+
queries OpenSearch (eventually consistent) rather than the primary
195+
store (MySQL). When a child is deleted and the parent delete follows
196+
immediately, OpenSearch may not yet have indexed the child's removal,
197+
causing the parent delete to be rejected with "Cannot delete domain
198+
which has child domains" even though the child is already gone.
199+
200+
The test bypasses TestSessionWrapper's post-mutation consistency sleep
201+
so both deletes fire back-to-back with no gap for OpenSearch to catch up.
202+
203+
References
204+
----------
205+
- DomainUtils.java: datahub-graphql-core/.../resolvers/mutate/util/DomainUtils.java
206+
- DeleteDomainResolver.java: datahub-graphql-core/.../resolvers/domain/DeleteDomainResolver.java
207+
"""
208+
run_id = uuid.uuid4().hex[:8]
209+
parent_id = f"test-domain-race-parent-{run_id}"
210+
child_id = f"test-domain-race-child-{run_id}"
211+
parent_urn = f"urn:li:domain:{parent_id}"
212+
child_urn = f"urn:li:domain:{child_id}"
213+
214+
try:
215+
res = execute_graphql(
216+
auth_session,
217+
_CREATE_DOMAIN_MUTATION,
218+
{"input": {"id": parent_id, "name": f"Race Test Parent {run_id}"}},
219+
)
220+
assert res["data"]["createDomain"] == parent_urn
221+
222+
res = execute_graphql(
223+
auth_session,
224+
_CREATE_DOMAIN_MUTATION,
225+
{
226+
"input": {
227+
"id": child_id,
228+
"name": f"Race Test Child {run_id}",
229+
"parentDomain": parent_urn,
230+
}
231+
},
232+
)
233+
assert res["data"]["createDomain"] == child_urn
234+
235+
# Delete child then immediately delete parent using the underlying
236+
# session directly, bypassing TestSessionWrapper's post-mutation
237+
# consistency sleep. This preserves the race window between the
238+
# child deletion (MySQL write) and the parent deletion attempt
239+
# (OpenSearch child-guard check).
240+
endpoint = f"{auth_session.frontend_url()}/api/v2/graphql"
241+
headers = {"Authorization": f"Bearer {auth_session.gms_token()}"}
242+
243+
def raw_graphql(query, variables):
244+
resp = auth_session._upstream.post(
245+
endpoint,
246+
json={"query": query, "variables": variables},
247+
headers=headers,
248+
)
249+
resp.raise_for_status()
250+
data = resp.json()
251+
assert "errors" not in data, f"GraphQL errors: {data.get('errors')}"
252+
return data
253+
254+
res = raw_graphql(_DELETE_DOMAIN_MUTATION, {"urn": child_urn})
255+
assert res["data"]["deleteDomain"] is True
256+
257+
res = raw_graphql(_DELETE_DOMAIN_MUTATION, {"urn": parent_urn})
258+
assert res["data"]["deleteDomain"] is True
259+
260+
finally:
261+
# Best-effort cleanup via REST, which bypasses the GraphQL child guard.
262+
for urn in (child_urn, parent_urn):
263+
try:
264+
delete_entity(auth_session, urn)
265+
except Exception:
266+
pass

0 commit comments

Comments
 (0)