Skip to content

Commit 82c0c9c

Browse files
feat(gms): add primary storage read pools for Ebean and Cassandra
Route non-locking aspect reads to optional read pools with OperationContext read preference, JDBC read-only connections for Postgres/MySQL, and docs plus Testcontainers integration tests for split-pool routing. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent a1d465d commit 82c0c9c

65 files changed

Lines changed: 1745 additions & 147 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

AGENTS.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,11 @@ scripts/dev/datahub-dev.sh env list # show current vars and pending_re
507507

508508
**Do NOT** manually edit `.env` files, use `docker compose -e`, or `export` — always use the wrapper.
509509

510+
**GMS primary storage read pool** (optional, entity aspect DAO only): `EBEAN_READ_POOL_ENABLED` /
511+
`CASSANDRA_READ_POOL_ENABLED` route non-locking reads to a second pool; writes and `forUpdate`
512+
reads stay on PRIMARY. See [docs/deploy/primary-storage-read-pool.md](docs/deploy/primary-storage-read-pool.md).
513+
`DATAHUB_READ_ONLY=true` is separate — it disables writes and does not register the read pool.
514+
510515
### Feature Flag Lifecycle
511516

512517
**All flag changes require a container restart.** Use `env set` + `env restart`:

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/knowledge/DocumentChangeHistoryResolver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public CompletableFuture<List<DocumentChange>> get(DataFetchingEnvironment envir
7474
// Get timeline from TimelineService
7575
List<ChangeTransaction> transactions =
7676
_timelineService.getTimeline(
77+
context.getOperationContext(),
7778
documentUrn,
7879
categories,
7980
startTime,

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/timeline/GetSchemaBlameResolver.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,14 @@ public CompletableFuture<GetSchemaBlameResult> get(final DataFetchingEnvironment
6161
Collections.singleton(ChangeCategory.TECHNICAL_SCHEMA);
6262
final List<ChangeTransaction> changeTransactionList =
6363
_timelineService.getTimeline(
64-
datasetUrn, changeCategorySet, startTime, endTime, null, null, false);
64+
context.getOperationContext(),
65+
datasetUrn,
66+
changeCategorySet,
67+
startTime,
68+
endTime,
69+
null,
70+
null,
71+
false);
6572
return SchemaBlameMapper.map(changeTransactionList, version);
6673
} catch (Exception e) {
6774
log.error("Failed to list schema blame data", e);

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/timeline/GetSchemaVersionListResolver.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,14 @@ public CompletableFuture<GetSchemaVersionListResult> get(
6060
changeCategorySet.add(ChangeCategory.TECHNICAL_SCHEMA);
6161
List<ChangeTransaction> changeTransactionList =
6262
_timelineService.getTimeline(
63-
datasetUrn, changeCategorySet, startTime, endTime, null, null, false);
63+
context.getOperationContext(),
64+
datasetUrn,
65+
changeCategorySet,
66+
startTime,
67+
endTime,
68+
null,
69+
null,
70+
false);
6471
return SchemaVersionListMapper.map(changeTransactionList);
6572
} catch (Exception e) {
6673
log.error("Failed to list schema version data", e);

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/timeline/GetTimelineResolver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public CompletableFuture<GetTimelineResult> get(final DataFetchingEnvironment en
6666
: Arrays.stream(ChangeCategory.values()).collect(Collectors.toSet());
6767
final List<ChangeTransaction> changeTransactionList =
6868
_timelineService.getTimeline(
69+
context.getOperationContext(),
6970
entityUrn,
7071
changeCategorySet,
7172
TimelineService.DEFAULT_MAX_CHANGE_TRANSACTIONS,

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/knowledge/DocumentChangeHistoryResolverTest.java

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.linkedin.metadata.timeline.data.ChangeOperation;
2020
import com.linkedin.metadata.timeline.data.ChangeTransaction;
2121
import graphql.schema.DataFetchingEnvironment;
22+
import io.datahubproject.metadata.context.OperationContext;
2223
import java.util.ArrayList;
2324
import java.util.HashMap;
2425
import java.util.List;
@@ -80,6 +81,7 @@ public void testGetChangeHistorySuccess() throws Exception {
8081
transactions.add(transaction);
8182

8283
when(mockTimelineService.getTimeline(
84+
any(OperationContext.class),
8385
eq(TEST_DOCUMENT_URN),
8486
any(Set.class),
8587
anyLong(),
@@ -126,7 +128,14 @@ public void testGetChangeHistoryWithContentModification() throws Exception {
126128
transactions.add(transaction);
127129

128130
when(mockTimelineService.getTimeline(
129-
any(Urn.class), any(Set.class), anyLong(), anyLong(), isNull(), isNull(), eq(false)))
131+
any(OperationContext.class),
132+
any(Urn.class),
133+
any(Set.class),
134+
anyLong(),
135+
anyLong(),
136+
isNull(),
137+
isNull(),
138+
eq(false)))
130139
.thenReturn(transactions);
131140

132141
List<DocumentChange> result = resolver.get(mockEnv).get();
@@ -166,7 +175,14 @@ public void testGetChangeHistoryWithParentChange() throws Exception {
166175
transactions.add(transaction);
167176

168177
when(mockTimelineService.getTimeline(
169-
any(Urn.class), any(Set.class), anyLong(), anyLong(), isNull(), isNull(), eq(false)))
178+
any(OperationContext.class),
179+
any(Urn.class),
180+
any(Set.class),
181+
anyLong(),
182+
anyLong(),
183+
isNull(),
184+
isNull(),
185+
eq(false)))
170186
.thenReturn(transactions);
171187

172188
List<DocumentChange> result = resolver.get(mockEnv).get();
@@ -201,7 +217,14 @@ public void testGetChangeHistoryWithStateChange() throws Exception {
201217
transactions.add(transaction);
202218

203219
when(mockTimelineService.getTimeline(
204-
any(Urn.class), any(Set.class), anyLong(), anyLong(), isNull(), isNull(), eq(false)))
220+
any(OperationContext.class),
221+
any(Urn.class),
222+
any(Set.class),
223+
anyLong(),
224+
anyLong(),
225+
isNull(),
226+
isNull(),
227+
eq(false)))
205228
.thenReturn(transactions);
206229

207230
List<DocumentChange> result = resolver.get(mockEnv).get();
@@ -221,6 +244,7 @@ public void testGetChangeHistoryWithCustomTimeRange() throws Exception {
221244
when(mockEnv.getArgument("limit")).thenReturn(100);
222245

223246
when(mockTimelineService.getTimeline(
247+
any(OperationContext.class),
224248
eq(TEST_DOCUMENT_URN),
225249
any(Set.class),
226250
eq(startTime),
@@ -235,6 +259,7 @@ public void testGetChangeHistoryWithCustomTimeRange() throws Exception {
235259
assertNotNull(result);
236260
verify(mockTimelineService, times(1))
237261
.getTimeline(
262+
any(OperationContext.class),
238263
eq(TEST_DOCUMENT_URN),
239264
any(Set.class),
240265
eq(startTime),
@@ -280,7 +305,14 @@ public void testGetChangeHistoryMultipleChanges() throws Exception {
280305
transactions.add(transaction2);
281306

282307
when(mockTimelineService.getTimeline(
283-
any(Urn.class), any(Set.class), anyLong(), anyLong(), isNull(), isNull(), eq(false)))
308+
any(OperationContext.class),
309+
any(Urn.class),
310+
any(Set.class),
311+
anyLong(),
312+
anyLong(),
313+
isNull(),
314+
isNull(),
315+
eq(false)))
284316
.thenReturn(transactions);
285317

286318
List<DocumentChange> result = resolver.get(mockEnv).get();
@@ -294,7 +326,14 @@ public void testGetChangeHistoryMultipleChanges() throws Exception {
294326
@Test(expectedExceptions = Exception.class)
295327
public void testGetChangeHistoryServiceThrowsException() throws Exception {
296328
when(mockTimelineService.getTimeline(
297-
any(Urn.class), any(Set.class), anyLong(), anyLong(), isNull(), isNull(), eq(false)))
329+
any(OperationContext.class),
330+
any(Urn.class),
331+
any(Set.class),
332+
anyLong(),
333+
anyLong(),
334+
isNull(),
335+
isNull(),
336+
eq(false)))
298337
.thenThrow(new RuntimeException("Service error"));
299338

300339
// Should throw an exception when service fails
@@ -304,7 +343,14 @@ public void testGetChangeHistoryServiceThrowsException() throws Exception {
304343
@Test
305344
public void testGetChangeHistoryEmptyResult() throws Exception {
306345
when(mockTimelineService.getTimeline(
307-
any(Urn.class), any(Set.class), anyLong(), anyLong(), isNull(), isNull(), eq(false)))
346+
any(OperationContext.class),
347+
any(Urn.class),
348+
any(Set.class),
349+
anyLong(),
350+
anyLong(),
351+
isNull(),
352+
isNull(),
353+
eq(false)))
308354
.thenReturn(new ArrayList<>());
309355

310356
List<DocumentChange> result = resolver.get(mockEnv).get();
@@ -341,7 +387,14 @@ public void testGetChangeHistoryWithRelatedAssetChange() throws Exception {
341387
transactions.add(transaction);
342388

343389
when(mockTimelineService.getTimeline(
344-
any(Urn.class), any(Set.class), anyLong(), anyLong(), isNull(), isNull(), eq(false)))
390+
any(OperationContext.class),
391+
any(Urn.class),
392+
any(Set.class),
393+
anyLong(),
394+
anyLong(),
395+
isNull(),
396+
isNull(),
397+
eq(false)))
345398
.thenReturn(transactions);
346399

347400
List<DocumentChange> result = resolver.get(mockEnv).get();
@@ -394,7 +447,14 @@ public void testGetChangeHistoryRespectsLimit() throws Exception {
394447

395448
when(mockEnv.getArgument("limit")).thenReturn(10);
396449
when(mockTimelineService.getTimeline(
397-
any(Urn.class), any(Set.class), anyLong(), anyLong(), isNull(), isNull(), eq(false)))
450+
any(OperationContext.class),
451+
any(Urn.class),
452+
any(Set.class),
453+
anyLong(),
454+
anyLong(),
455+
isNull(),
456+
isNull(),
457+
eq(false)))
398458
.thenReturn(transactions);
399459

400460
List<DocumentChange> result = resolver.get(mockEnv).get();

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/timeline/GetSchemaBlameResolverTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void testGetUnauthorizedThrowsAndDoesNotQueryDb() throws Exception {
3939
public void testGetAuthorizedInvokesTimelineService() throws Exception {
4040
TimelineService mockTimelineService = mock(TimelineService.class);
4141
when(mockTimelineService.getTimeline(
42-
any(), any(), anyLong(), anyLong(), any(), any(), anyBoolean()))
42+
any(), any(), any(), anyLong(), anyLong(), any(), any(), anyBoolean()))
4343
.thenReturn(Collections.emptyList());
4444

4545
GetSchemaBlameResolver resolver = new GetSchemaBlameResolver(mockTimelineService);
@@ -54,6 +54,6 @@ public void testGetAuthorizedInvokesTimelineService() throws Exception {
5454

5555
resolver.get(mockEnv).get();
5656
verify(mockTimelineService, times(1))
57-
.getTimeline(any(), any(), anyLong(), anyLong(), any(), any(), anyBoolean());
57+
.getTimeline(any(), any(), any(), anyLong(), anyLong(), any(), any(), anyBoolean());
5858
}
5959
}

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/timeline/GetSchemaVersionListResolverTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void testGetUnauthorizedThrowsAndDoesNotQueryDb() throws Exception {
3939
public void testGetAuthorizedInvokesTimelineService() throws Exception {
4040
TimelineService mockTimelineService = mock(TimelineService.class);
4141
when(mockTimelineService.getTimeline(
42-
any(), any(), anyLong(), anyLong(), any(), any(), anyBoolean()))
42+
any(), any(), any(), anyLong(), anyLong(), any(), any(), anyBoolean()))
4343
.thenReturn(Collections.emptyList());
4444

4545
GetSchemaVersionListResolver resolver = new GetSchemaVersionListResolver(mockTimelineService);
@@ -54,6 +54,6 @@ public void testGetAuthorizedInvokesTimelineService() throws Exception {
5454

5555
resolver.get(mockEnv).get();
5656
verify(mockTimelineService, times(1))
57-
.getTimeline(any(), any(), anyLong(), anyLong(), any(), any(), anyBoolean());
57+
.getTimeline(any(), any(), any(), anyLong(), anyLong(), any(), any(), anyBoolean());
5858
}
5959
}

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/timeline/GetTimelineResolverTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void testGetUnauthorizedThrowsAndDoesNotQueryDb() {
6363
@Test
6464
public void testGetAuthorizedReturnsResult() throws Exception {
6565
TimelineService mockTimelineService = mock(TimelineService.class);
66-
when(mockTimelineService.getTimeline(any(), any(), anyInt(), anyBoolean()))
66+
when(mockTimelineService.getTimeline(any(), any(), any(), anyInt(), anyBoolean()))
6767
.thenReturn(List.of());
6868

6969
GetTimelineResolver resolver = new GetTimelineResolver(mockTimelineService);
@@ -77,6 +77,6 @@ public void testGetAuthorizedReturnsResult() throws Exception {
7777
when(mockEnv.getArgument("input")).thenReturn(input);
7878

7979
assertNotNull(resolver.get(mockEnv).get());
80-
verify(mockTimelineService, times(1)).getTimeline(any(), any(), anyInt(), anyBoolean());
80+
verify(mockTimelineService, times(1)).getTimeline(any(), any(), any(), anyInt(), anyBoolean());
8181
}
8282
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/loadindices/LoadIndicesStep.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
1717
import com.linkedin.metadata.entity.ebean.PartitionedStream;
1818
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
19+
import com.linkedin.metadata.entity.storage.PrimaryStorageResolver;
1920
import com.linkedin.metadata.service.UpdateIndicesService;
2021
import com.linkedin.metadata.utils.PegasusUtils;
2122
import com.linkedin.mxe.MetadataChangeLog;
@@ -178,7 +179,7 @@ private LoadIndicesResult processAllDataDirectly(
178179
// Create EbeanAspectDao for streaming
179180
EbeanAspectDao aspectDao =
180181
new EbeanAspectDao(
181-
server,
182+
PrimaryStorageResolver.forSingleEbeanDatabase(server),
182183
EbeanConfiguration.testDefault,
183184
null,
184185
java.util.Collections.emptyList(),

0 commit comments

Comments
 (0)