Skip to content

Commit 5c1404e

Browse files
author
Ali Poursamadi
committed
[controller][schema] Move migration schema normalization into
StoreSchemaService
1 parent cd16e9e commit 5c1404e

7 files changed

Lines changed: 86 additions & 85 deletions

File tree

services/venice-controller/src/main/java/com/linkedin/venice/controller/ParentSchemaOrchestrator.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ SchemaEntry addValueSchema(
9090
DirectionalSchemaCompatibilityType expectedCompatibilityType) {
9191
parent.acquireAdminMessageLock(clusterName, storeName);
9292
try {
93-
newValueSchemaStr = parent.getVeniceHelixAdmin()
94-
.normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr);
93+
newValueSchemaStr = storeSchemaService.normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr);
9594
final int newValueSchemaId = storeSchemaService.checkPreConditionForAddValueSchemaAndGetNewSchemaId(
9695
clusterName,
9796
storeName,
@@ -253,8 +252,7 @@ SchemaEntry addValueSchema(
253252
DirectionalSchemaCompatibilityType expectedCompatibilityType) {
254253
parent.acquireAdminMessageLock(clusterName, storeName);
255254
try {
256-
newValueSchemaStr =
257-
parent.getVeniceHelixAdmin().normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr);
255+
newValueSchemaStr = storeSchemaService.normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr);
258256
Schema newValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(newValueSchemaStr);
259257

260258
final Store store = parent.getVeniceHelixAdmin().getStore(clusterName, storeName);

services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreSchemaService.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import com.linkedin.avroutil1.compatibility.RecordGenerationConfig;
99
import com.linkedin.venice.exceptions.InvalidVeniceSchemaException;
1010
import com.linkedin.venice.exceptions.VeniceException;
11+
import com.linkedin.venice.helix.ZkStoreConfigAccessor;
1112
import com.linkedin.venice.meta.ReadWriteSchemaRepository;
1213
import com.linkedin.venice.meta.Store;
14+
import com.linkedin.venice.meta.StoreConfig;
1315
import com.linkedin.venice.meta.Version;
1416
import com.linkedin.venice.schema.AvroSchemaParseUtils;
1517
import com.linkedin.venice.schema.GeneratedSchemaID;
@@ -235,13 +237,67 @@ private void validateValueSchemaUsingRandomGenerator(String schemaStr, String cl
235237
}
236238
}
237239

240+
/**
241+
* If {@code storeName} is migrating into {@code clusterName}, accept schemas that fail strict parse only because of
242+
* {@code validateNumericDefaultValueTypes} (e.g. legacy {@code {"type":"float","default":0}}) by walking the JSON
243+
* and coercing numeric defaults to the declared field type. The output is strict-parse-clean, which keeps downstream
244+
* consumers that strict-parse (DaVinci's {@code SchemaUtils.annotateValueSchema}, VPJ, Samza producer) working.
245+
*
246+
* Re-strict-parses the coerced output as a defensive check so anything beyond the numeric-default tier (bad names,
247+
* dangling content, union default not first branch) still fails loudly.
248+
*
249+
* For non-migration calls, and for migration calls whose input is already strict-clean, returns the input unchanged —
250+
* so this can be wired into entry points idempotently.
251+
*
252+
* @return possibly-coerced schema string that is guaranteed to pass strict parsing.
253+
*/
254+
String normalizeSchemaForMigration(String clusterName, String storeName, String schemaStr) {
255+
ZkStoreConfigAccessor accessor = admin.getStoreConfigAccessor(clusterName);
256+
if (!accessor.containsConfig(storeName)) {
257+
return schemaStr;
258+
}
259+
StoreConfig cfg = accessor.getStoreConfig(storeName);
260+
if (cfg == null || !clusterName.equals(cfg.getMigrationDestCluster())) {
261+
return schemaStr;
262+
}
263+
// Migration context. If strict already passes, leave the string unchanged so we don't
264+
// introduce gratuitous diffs against the source schema.
265+
try {
266+
AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr);
267+
return schemaStr;
268+
} catch (Exception strictFailure) {
269+
LOGGER.info(
270+
"Strict parse failed for store {} migrating into cluster {}; attempting numeric-default coercion.",
271+
storeName,
272+
clusterName,
273+
strictFailure);
274+
String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schemaStr);
275+
// Defensive: anything LOOSE_NUMERICS would have been lenient about (union default not first
276+
// branch, bad names, etc.) is outside the coercion scope and must still fail strict. When it
277+
// does, surface the *original* strict failure too — it's the one the operator needs to see.
278+
try {
279+
AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced);
280+
} catch (Exception coercedFailure) {
281+
coercedFailure.addSuppressed(strictFailure);
282+
throw coercedFailure;
283+
}
284+
if (!coerced.equals(schemaStr)) {
285+
LOGGER.info(
286+
"Coerced numeric default(s) in value schema for store {} migrating into cluster {}.",
287+
storeName,
288+
clusterName);
289+
}
290+
return coerced;
291+
}
292+
}
293+
238294
SchemaEntry addValueSchema(
239295
String clusterName,
240296
String storeName,
241297
String valueSchemaStr,
242298
DirectionalSchemaCompatibilityType expectedCompatibilityType) {
243299
admin.checkControllerLeadershipFor(clusterName);
244-
valueSchemaStr = admin.normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr);
300+
valueSchemaStr = normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr);
245301
ReadWriteSchemaRepository schemaRepository =
246302
admin.getHelixVeniceClusterResources(clusterName).getSchemaRepository();
247303
SchemaEntry schemaEntry = schemaRepository.addValueSchema(storeName, valueSchemaStr, expectedCompatibilityType);
@@ -260,7 +316,7 @@ SchemaEntry addValueSchema(
260316
int schemaId,
261317
DirectionalSchemaCompatibilityType compatibilityType) {
262318
admin.checkControllerLeadershipFor(clusterName);
263-
valueSchemaStr = admin.normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr);
319+
valueSchemaStr = normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr);
264320
ReadWriteSchemaRepository schemaRepository =
265321
admin.getHelixVeniceClusterResources(clusterName).getSchemaRepository();
266322
int newValueSchemaId =
@@ -318,8 +374,8 @@ SchemaEntry addSupersetSchema(
318374
String supersetSchemaStr,
319375
int supersetSchemaId) {
320376
admin.checkControllerLeadershipFor(clusterName);
321-
valueSchema = admin.normalizeSchemaForMigration(clusterName, storeName, valueSchema);
322-
supersetSchemaStr = admin.normalizeSchemaForMigration(clusterName, storeName, supersetSchemaStr);
377+
valueSchema = normalizeSchemaForMigration(clusterName, storeName, valueSchema);
378+
supersetSchemaStr = normalizeSchemaForMigration(clusterName, storeName, supersetSchemaStr);
323379
ReadWriteSchemaRepository schemaRepository =
324380
admin.getHelixVeniceClusterResources(clusterName).getSchemaRepository();
325381

@@ -364,7 +420,7 @@ int checkPreConditionForAddValueSchemaAndGetNewSchemaId(
364420
String storeName,
365421
String valueSchemaStr,
366422
DirectionalSchemaCompatibilityType expectedCompatibilityType) {
367-
valueSchemaStr = admin.normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr);
423+
valueSchemaStr = normalizeSchemaForMigration(clusterName, storeName, valueSchemaStr);
368424
AvroSchemaUtils.validateAvroSchemaStr(valueSchemaStr);
369425
AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr);
370426
validateValueSchemaUsingRandomGenerator(valueSchemaStr, clusterName, storeName);

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 1 addition & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,7 +1176,7 @@ public void createStore(
11761176
HelixVeniceClusterResources clusterResources = getHelixVeniceClusterResources(clusterName);
11771177
LOGGER.info("Start creating store {} in cluster {} with owner {}", storeName, clusterName, owner);
11781178
try (AutoCloseableLock ignore = clusterResources.getClusterLockManager().createStoreWriteLock(storeName)) {
1179-
valueSchema = normalizeSchemaForMigration(clusterName, storeName, valueSchema);
1179+
valueSchema = storeSchemaService.normalizeSchemaForMigration(clusterName, storeName, valueSchema);
11801180
checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, true);
11811181
VeniceControllerClusterConfig config = getHelixVeniceClusterResources(clusterName).getConfig();
11821182
Store newStore = new ZKStore(
@@ -2257,61 +2257,6 @@ protected void checkPreConditionForCreateStore(
22572257
new SchemaEntry(SchemaData.INVALID_VALUE_SCHEMA_ID, valueSchema);
22582258
}
22592259

2260-
/**
2261-
* If {@code storeName} is migrating into {@code clusterName}, accept schemas that fail strict
2262-
* parse only because of {@code validateNumericDefaultValueTypes} (e.g. legacy
2263-
* {@code {"type":"float","default":0}}) by walking the JSON and coercing numeric defaults to the
2264-
* declared field type. The output is strict-parse-clean, which keeps downstream consumers that
2265-
* strict-parse (DaVinci's {@code SchemaUtils.annotateValueSchema}, VPJ, Samza producer) working.
2266-
*
2267-
* Re-strict-parses the coerced output as a defensive check so anything beyond the numeric-default
2268-
* tier (bad names, dangling content, union default not first branch) still fails loudly.
2269-
*
2270-
* For non-migration calls, and for migration calls whose input is already strict-clean, returns
2271-
* the input unchanged — so this can be wired into entry points idempotently.
2272-
*
2273-
* @return possibly-coerced schema string that is guaranteed to pass strict parsing.
2274-
*/
2275-
String normalizeSchemaForMigration(String clusterName, String storeName, String schemaStr) {
2276-
ZkStoreConfigAccessor accessor = getStoreConfigAccessor(clusterName);
2277-
if (!accessor.containsConfig(storeName)) {
2278-
return schemaStr;
2279-
}
2280-
StoreConfig cfg = accessor.getStoreConfig(storeName);
2281-
if (cfg == null || !clusterName.equals(cfg.getMigrationDestCluster())) {
2282-
return schemaStr;
2283-
}
2284-
// Migration context. If strict already passes, leave the string unchanged so we don't
2285-
// introduce gratuitous diffs against the source schema.
2286-
try {
2287-
AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(schemaStr);
2288-
return schemaStr;
2289-
} catch (Exception strictFailure) {
2290-
LOGGER.info(
2291-
"Strict parse failed for store {} migrating into cluster {}; attempting numeric-default coercion.",
2292-
storeName,
2293-
clusterName,
2294-
strictFailure);
2295-
String coerced = AvroSchemaParseUtils.coerceNumericDefaultsToFieldType(schemaStr);
2296-
// Defensive: anything LOOSE_NUMERICS would have been lenient about (union default not first
2297-
// branch, bad names, etc.) is outside the coercion scope and must still fail strict. When it
2298-
// does, surface the *original* strict failure too — it's the one the operator needs to see.
2299-
try {
2300-
AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(coerced);
2301-
} catch (Exception coercedFailure) {
2302-
coercedFailure.addSuppressed(strictFailure);
2303-
throw coercedFailure;
2304-
}
2305-
if (!coerced.equals(schemaStr)) {
2306-
LOGGER.info(
2307-
"Coerced numeric default(s) in value schema for store {} migrating into cluster {}.",
2308-
storeName,
2309-
clusterName);
2310-
}
2311-
return coerced;
2312-
}
2313-
}
2314-
23152260
void checkStoreGraveyardForRecreation(String clusterName, String storeName) {
23162261
HelixVeniceClusterResources clusterResources = getHelixVeniceClusterResources(clusterName);
23172262

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,8 @@ public void createStore(
916916
Optional<String> accessPermissions) {
917917
acquireAdminMessageLock(clusterName, storeName);
918918
try {
919-
valueSchema = getVeniceHelixAdmin().normalizeSchemaForMigration(clusterName, storeName, valueSchema);
919+
valueSchema = getVeniceHelixAdmin().getStoreSchemaService()
920+
.normalizeSchemaForMigration(clusterName, storeName, valueSchema);
920921
getVeniceHelixAdmin()
921922
.checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, false);
922923
LOGGER.info("Adding store: {} to cluster: {}", storeName, clusterName);

services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ public void setupInternalMocks() {
102102
SchemaEntry mockEntry = new SchemaEntry(0, TEST_SCHEMA);
103103
doReturn(mockEntry).when(internalAdmin).getKeySchema(anyString(), anyString());
104104
// Outside a store-migration context, schema normalization is a passthrough. Mirror that here so
105-
// the mocked internal admin doesn't return null and blank out the value schema during createStore/
105+
// the mocked schema service doesn't return null and blank out the value schema during createStore/
106106
// addValueSchema.
107-
when(internalAdmin.normalizeSchemaForMigration(anyString(), anyString(), any()))
107+
when(storeSchemaService.normalizeSchemaForMigration(anyString(), anyString(), any()))
108108
.thenAnswer(invocation -> invocation.getArgument(2));
109109

110110
zkClient = mock(ZkClient.class);

services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreSchemaService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import static org.mockito.ArgumentMatchers.anyInt;
55
import static org.mockito.ArgumentMatchers.anyString;
66
import static org.mockito.ArgumentMatchers.eq;
7-
import static org.mockito.Mockito.doAnswer;
87
import static org.mockito.Mockito.doReturn;
98
import static org.mockito.Mockito.doThrow;
109
import static org.mockito.Mockito.mock;
@@ -17,6 +16,7 @@
1716
import static org.testng.Assert.assertTrue;
1817

1918
import com.linkedin.venice.exceptions.VeniceException;
19+
import com.linkedin.venice.helix.ZkStoreConfigAccessor;
2020
import com.linkedin.venice.meta.ReadWriteSchemaRepository;
2121
import com.linkedin.venice.meta.Store;
2222
import com.linkedin.venice.meta.Version;
@@ -55,6 +55,7 @@ public class TestStoreSchemaService {
5555
private VeniceHelixAdmin admin;
5656
private HelixVeniceClusterResources resources;
5757
private ReadWriteSchemaRepository schemaRepo;
58+
private ZkStoreConfigAccessor storeConfigAccessor;
5859
private VeniceControllerClusterConfig config;
5960
private StoreSchemaService service;
6061

@@ -63,10 +64,11 @@ public void setUp() {
6364
admin = mock(VeniceHelixAdmin.class);
6465
resources = mock(HelixVeniceClusterResources.class);
6566
schemaRepo = mock(ReadWriteSchemaRepository.class);
67+
storeConfigAccessor = mock(ZkStoreConfigAccessor.class);
6668
config = mock(VeniceControllerClusterConfig.class);
6769
doReturn(resources).when(admin).getHelixVeniceClusterResources(CLUSTER);
68-
doAnswer(invocation -> invocation.getArgument(2)).when(admin)
69-
.normalizeSchemaForMigration(anyString(), anyString(), anyString());
70+
doReturn(storeConfigAccessor).when(admin).getStoreConfigAccessor(anyString());
71+
doReturn(false).when(storeConfigAccessor).containsConfig(anyString());
7072
doReturn(schemaRepo).when(resources).getSchemaRepository();
7173
doReturn(config).when(resources).getConfig();
7274
service = new StoreSchemaService(admin);

services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ public void testWriteEndOfPushThrowsForNonExistentStore() throws Exception {
477477
admin.writeEndOfPush("cluster", "missing_store", 1, false, null);
478478
}
479479

480-
// ---- normalizeSchemaForMigration ---------------------------------------------------------
480+
// ---- StoreSchemaService.normalizeSchemaForMigration --------------------------------------
481481

482482
private static final String CLEAN_VALUE_SCHEMA =
483483
"{\"type\":\"record\",\"name\":\"Clean\",\"fields\":[" + "{\"name\":\"v\",\"type\":\"string\"}]}";
@@ -491,23 +491,22 @@ public void testWriteEndOfPushThrowsForNonExistentStore() throws Exception {
491491
private static final String NON_NUMERIC_STRICT_VIOLATION_SCHEMA = "{\"type\":\"record\",\"name\":\"BadUnion\","
492492
+ "\"fields\":[{\"name\":\"f\",\"type\":[\"int\",\"null\"],\"default\":null}]}";
493493

494-
private static VeniceHelixAdmin newNormalizeMock(String clusterName, String storeName, StoreConfig storeConfig) {
494+
private static StoreSchemaService newNormalizeService(String clusterName, String storeName, StoreConfig storeConfig) {
495495
VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class);
496496
ZkStoreConfigAccessor accessor = mock(ZkStoreConfigAccessor.class);
497497
doReturn(storeConfig != null).when(accessor).containsConfig(storeName);
498498
doReturn(storeConfig).when(accessor).getStoreConfig(storeName);
499499
doReturn(accessor).when(admin).getStoreConfigAccessor(clusterName);
500-
doCallRealMethod().when(admin).normalizeSchemaForMigration(anyString(), anyString(), anyString());
501-
return admin;
500+
return new StoreSchemaService(admin);
502501
}
503502

504503
@Test
505504
public void testNormalizeReturnsInputWhenStoreConfigAbsent() {
506505
String cluster = "venice-dest";
507506
String store = "legacy_store";
508-
VeniceHelixAdmin admin = newNormalizeMock(cluster, store, null);
507+
StoreSchemaService service = newNormalizeService(cluster, store, null);
509508
Assert.assertSame(
510-
admin.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA),
509+
service.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA),
511510
LEGACY_NUMERIC_DEFAULT_SCHEMA,
512511
"Non-migration context (no storeConfig) must return input by identity to avoid parse cost");
513512
}
@@ -519,9 +518,9 @@ public void testNormalizeReturnsInputWhenMigrationDestIsAnotherCluster() {
519518
StoreConfig cfg = mock(StoreConfig.class);
520519
doReturn("some_other_dest").when(cfg).getMigrationDestCluster();
521520

522-
VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg);
521+
StoreSchemaService service = newNormalizeService(cluster, store, cfg);
523522
Assert.assertSame(
524-
admin.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA),
523+
service.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA),
525524
LEGACY_NUMERIC_DEFAULT_SCHEMA,
526525
"When migrationDestCluster does not match this cluster, input must pass through unchanged");
527526
}
@@ -533,9 +532,9 @@ public void testNormalizeReturnsInputWhenStrictAlreadyPasses() {
533532
StoreConfig cfg = mock(StoreConfig.class);
534533
doReturn(cluster).when(cfg).getMigrationDestCluster();
535534

536-
VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg);
535+
StoreSchemaService service = newNormalizeService(cluster, store, cfg);
537536
Assert.assertSame(
538-
admin.normalizeSchemaForMigration(cluster, store, CLEAN_VALUE_SCHEMA),
537+
service.normalizeSchemaForMigration(cluster, store, CLEAN_VALUE_SCHEMA),
539538
CLEAN_VALUE_SCHEMA,
540539
"Strict-clean input under migration context must not be reserialized");
541540
}
@@ -547,8 +546,8 @@ public void testNormalizeReserializesLegacyNumericDefault() {
547546
StoreConfig cfg = mock(StoreConfig.class);
548547
doReturn(cluster).when(cfg).getMigrationDestCluster();
549548

550-
VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg);
551-
String normalized = admin.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA);
549+
StoreSchemaService service = newNormalizeService(cluster, store, cfg);
550+
String normalized = service.normalizeSchemaForMigration(cluster, store, LEGACY_NUMERIC_DEFAULT_SCHEMA);
552551

553552
Assert.assertNotEquals(normalized, LEGACY_NUMERIC_DEFAULT_SCHEMA, "Legacy schema must be reserialized");
554553
// The whole point: output must be strict-parse-clean so downstream consumers don't trip.
@@ -562,12 +561,12 @@ public void testNormalizeRejectsNonNumericStrictViolation() {
562561
StoreConfig cfg = mock(StoreConfig.class);
563562
doReturn(cluster).when(cfg).getMigrationDestCluster();
564563

565-
VeniceHelixAdmin admin = newNormalizeMock(cluster, store, cfg);
564+
StoreSchemaService service = newNormalizeService(cluster, store, cfg);
566565
// Migration context, but the violation is outside the numeric-default tier — must propagate.
567566
// The original strict failure must ride along as a suppressed exception so the operator can
568567
// see what was actually wrong with the source schema, not just that post-coercion strict tripped.
569568
try {
570-
admin.normalizeSchemaForMigration(cluster, store, NON_NUMERIC_STRICT_VIOLATION_SCHEMA);
569+
service.normalizeSchemaForMigration(cluster, store, NON_NUMERIC_STRICT_VIOLATION_SCHEMA);
571570
Assert.fail("Expected normalize to throw on non-numeric strict violation");
572571
} catch (Exception coercedFailure) {
573572
Assert.assertTrue(

0 commit comments

Comments
 (0)