Skip to content

Commit 9a44f94

Browse files
author
Ali Poursamadi
committed
Rename store schema service to manager
1 parent 20a55cd commit 9a44f94

8 files changed

Lines changed: 122 additions & 122 deletions

File tree

services/venice-controller/src/main/java/com/linkedin/venice/controller/ParentSchemaOrchestrator.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,18 @@ class ParentSchemaOrchestrator {
4545
private static final Logger LOGGER = LogManager.getLogger(ParentSchemaOrchestrator.class);
4646

4747
private final VeniceParentHelixAdmin parent;
48-
private final StoreSchemaService storeSchemaService;
48+
private final StoreSchemaManager storeSchemaManager;
4949
private final WriteComputeSchemaConverter writeComputeSchemaConverter;
5050
private final Optional<SupersetSchemaGenerator> externalSupersetSchemaGenerator;
5151
private final SupersetSchemaGenerator defaultSupersetSchemaGenerator = new DefaultSupersetSchemaGenerator();
5252

5353
ParentSchemaOrchestrator(
5454
VeniceParentHelixAdmin parent,
55-
StoreSchemaService storeSchemaService,
55+
StoreSchemaManager storeSchemaManager,
5656
WriteComputeSchemaConverter writeComputeSchemaConverter,
5757
Optional<SupersetSchemaGenerator> externalSupersetSchemaGenerator) {
5858
this.parent = parent;
59-
this.storeSchemaService = storeSchemaService;
59+
this.storeSchemaManager = storeSchemaManager;
6060
this.writeComputeSchemaConverter = writeComputeSchemaConverter;
6161
this.externalSupersetSchemaGenerator = externalSupersetSchemaGenerator;
6262
}
@@ -90,8 +90,8 @@ SchemaEntry addValueSchema(
9090
DirectionalSchemaCompatibilityType expectedCompatibilityType) {
9191
parent.acquireAdminMessageLock(clusterName, storeName);
9292
try {
93-
newValueSchemaStr = storeSchemaService.normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr);
94-
final int newValueSchemaId = storeSchemaService.checkPreConditionForAddValueSchemaAndGetNewSchemaId(
93+
newValueSchemaStr = storeSchemaManager.normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr);
94+
final int newValueSchemaId = storeSchemaManager.checkPreConditionForAddValueSchemaAndGetNewSchemaId(
9595
clusterName,
9696
storeName,
9797
newValueSchemaStr,
@@ -252,11 +252,11 @@ SchemaEntry addValueSchema(
252252
DirectionalSchemaCompatibilityType expectedCompatibilityType) {
253253
parent.acquireAdminMessageLock(clusterName, storeName);
254254
try {
255-
newValueSchemaStr = storeSchemaService.normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr);
255+
newValueSchemaStr = storeSchemaManager.normalizeSchemaForMigration(clusterName, storeName, newValueSchemaStr);
256256
Schema newValueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(newValueSchemaStr);
257257

258258
final Store store = parent.getVeniceHelixAdmin().getStore(clusterName, storeName);
259-
Schema existingValueSchema = storeSchemaService.getSupersetOrLatestValueSchema(clusterName, store);
259+
Schema existingValueSchema = storeSchemaManager.getSupersetOrLatestValueSchema(clusterName, store);
260260

261261
// Update superset schema if:
262262
// 1. Compute is enabled (existing behavior), OR
@@ -290,14 +290,14 @@ SchemaEntry addValueSchema(
290290
// Register superset schema only if it does not match with existing or new schema.
291291

292292
// validate compatibility of the new superset schema
293-
storeSchemaService.checkPreConditionForAddValueSchemaAndGetNewSchemaId(
293+
storeSchemaManager.checkPreConditionForAddValueSchemaAndGetNewSchemaId(
294294
clusterName,
295295
storeName,
296296
newSuperSetSchemaStr,
297297
expectedCompatibilityType);
298298
// Check if the superset schema already exists or not. If exists use the same ID, else bump the value ID by
299299
// one.
300-
int supersetSchemaId = storeSchemaService.getValueSchemaIdIgnoreFieldOrder(
300+
int supersetSchemaId = storeSchemaManager.getValueSchemaIdIgnoreFieldOrder(
301301
clusterName,
302302
storeName,
303303
newSuperSetSchemaStr,
@@ -369,7 +369,7 @@ DerivedSchemaEntry addDerivedSchema(
369369
String derivedSchemaStr) {
370370
parent.acquireAdminMessageLock(clusterName, storeName);
371371
try {
372-
int newDerivedSchemaId = storeSchemaService.checkPreConditionForAddDerivedSchemaAndGetNewSchemaId(
372+
int newDerivedSchemaId = storeSchemaManager.checkPreConditionForAddDerivedSchemaAndGetNewSchemaId(
373373
clusterName,
374374
storeName,
375375
valueSchemaId,
@@ -426,7 +426,7 @@ RmdSchemaEntry addReplicationMetadataSchema(
426426
RmdSchemaEntry rmdSchemaEntry =
427427
new RmdSchemaEntry(valueSchemaId, replicationMetadataVersionId, replicationMetadataSchemaStr);
428428
final boolean replicationMetadataSchemaAlreadyPresent =
429-
storeSchemaService.checkIfMetadataSchemaAlreadyPresent(clusterName, storeName, rmdSchemaEntry);
429+
storeSchemaManager.checkIfMetadataSchemaAlreadyPresent(clusterName, storeName, rmdSchemaEntry);
430430
if (replicationMetadataSchemaAlreadyPresent) {
431431
LOGGER.info(
432432
"Replication metadata schema already exists for store: {} in cluster: {} metadataSchema: {} "
@@ -531,7 +531,7 @@ void updateReplicationMetadataSchemaForAllValueSchema(String clusterName, String
531531
void updateReplicationMetadataSchema(String clusterName, String storeName, Schema valueSchema, int valueSchemaId) {
532532
final int rmdVersionId = parent.getRmdVersionID(storeName, clusterName);
533533
final boolean valueSchemaAlreadyHasRmdSchema =
534-
storeSchemaService.checkIfValueSchemaAlreadyHasRmdSchema(clusterName, storeName, valueSchemaId, rmdVersionId);
534+
storeSchemaManager.checkIfValueSchemaAlreadyHasRmdSchema(clusterName, storeName, valueSchemaId, rmdVersionId);
535535
if (valueSchemaAlreadyHasRmdSchema) {
536536
LOGGER.info(
537537
"Store {} in cluster {} already has a replication metadata schema for its value schema with ID {} and "

services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreSchemaService.java renamed to services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreSchemaManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@
4848
* delegate here; this class reaches back into the admin only for the cluster-leadership check and per-cluster
4949
* resources/meta-store accessors.
5050
*/
51-
class StoreSchemaService {
52-
private static final Logger LOGGER = LogManager.getLogger(StoreSchemaService.class);
51+
class StoreSchemaManager {
52+
private static final Logger LOGGER = LogManager.getLogger(StoreSchemaManager.class);
5353
private static final int RECORD_COUNT = 10;
5454

5555
private final VeniceHelixAdmin admin;
5656

57-
StoreSchemaService(VeniceHelixAdmin admin) {
57+
StoreSchemaManager(VeniceHelixAdmin admin) {
5858
this.admin = admin;
5959
}
6060

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
396396
private final Lazy<PushStatusStoreWriter> pushStatusStoreWriter;
397397
private final SharedHelixReadOnlyZKSharedSystemStoreRepository zkSharedSystemStoreRepository;
398398
private final SharedHelixReadOnlyZKSharedSchemaRepository zkSharedSchemaRepository;
399-
private final StoreSchemaService storeSchemaService;
399+
private final StoreSchemaManager storeSchemaManager;
400400
private final MetaStoreWriter metaStoreWriter;
401401
private final MetaStoreReader metaStoreReader;
402402
private final D2Client d2Client;
@@ -629,7 +629,7 @@ public VeniceHelixAdmin(
629629
commonConfig.getSystemSchemaClusterName(),
630630
commonConfig.getRefreshAttemptsForZkReconnect(),
631631
commonConfig.getRefreshIntervalForZkReconnectInMs());
632-
storeSchemaService = new StoreSchemaService(this);
632+
storeSchemaManager = new StoreSchemaManager(this);
633633
metaStoreWriter = new MetaStoreWriter(
634634
topicManagerRepository.getLocalTopicManager(),
635635
veniceWriterFactory,
@@ -1172,7 +1172,7 @@ public void createStore(
11721172
HelixVeniceClusterResources clusterResources = getHelixVeniceClusterResources(clusterName);
11731173
LOGGER.info("Start creating store {} in cluster {} with owner {}", storeName, clusterName, owner);
11741174
try (AutoCloseableLock ignore = clusterResources.getClusterLockManager().createStoreWriteLock(storeName)) {
1175-
valueSchema = storeSchemaService.normalizeSchemaForMigration(clusterName, storeName, valueSchema);
1175+
valueSchema = storeSchemaManager.normalizeSchemaForMigration(clusterName, storeName, valueSchema);
11761176
checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, true);
11771177
VeniceControllerClusterConfig config = getHelixVeniceClusterResources(clusterName).getConfig();
11781178
Store newStore = new ZKStore(
@@ -3852,7 +3852,7 @@ public Optional<Schema> getReplicationMetadataSchema(
38523852
String storeName,
38533853
int valueSchemaID,
38543854
int rmdVersionID) {
3855-
return storeSchemaService.getReplicationMetadataSchema(clusterName, storeName, valueSchemaID, rmdVersionID);
3855+
return storeSchemaManager.getReplicationMetadataSchema(clusterName, storeName, valueSchemaID, rmdVersionID);
38563856
}
38573857

38583858
@Override
@@ -5359,11 +5359,11 @@ public StoreMetaValue getMetaStoreValue(StoreMetaKey metaKey, String storeName)
53595359

53605360
@Override
53615361
public Set<Integer> getInUseValueSchemaIds(String clusterName, String storeName) {
5362-
return storeSchemaService.getInUseValueSchemaIds(clusterName, storeName);
5362+
return storeSchemaManager.getInUseValueSchemaIds(clusterName, storeName);
53635363
}
53645364

53655365
public void deleteValueSchemas(String clusterName, String storeName, Set<Integer> unusedValueSchemaIds) {
5366-
storeSchemaService.deleteValueSchemas(clusterName, storeName, unusedValueSchemaIds);
5366+
storeSchemaManager.deleteValueSchemas(clusterName, storeName, unusedValueSchemaIds);
53675367
}
53685368

53695369
public void setStoreCompressionStrategy(
@@ -6298,47 +6298,47 @@ public void enableDisabledPartition(String clusterName, String kafkaTopic, boole
62986298
*/
62996299
@Override
63006300
public SchemaEntry getKeySchema(String clusterName, String storeName) {
6301-
return storeSchemaService.getKeySchema(clusterName, storeName);
6301+
return storeSchemaManager.getKeySchema(clusterName, storeName);
63026302
}
63036303

63046304
/**
63056305
* @return the value schema for the specified store.
63066306
*/
63076307
@Override
63086308
public Collection<SchemaEntry> getValueSchemas(String clusterName, String storeName) {
6309-
return storeSchemaService.getValueSchemas(clusterName, storeName);
6309+
return storeSchemaManager.getValueSchemas(clusterName, storeName);
63106310
}
63116311

63126312
/**
63136313
* @return the derived schema for the specified store.
63146314
*/
63156315
@Override
63166316
public Collection<DerivedSchemaEntry> getDerivedSchemas(String clusterName, String storeName) {
6317-
return storeSchemaService.getDerivedSchemas(clusterName, storeName);
6317+
return storeSchemaManager.getDerivedSchemas(clusterName, storeName);
63186318
}
63196319

63206320
/**
63216321
* @return the schema id for the specified store and value schema.
63226322
*/
63236323
@Override
63246324
public int getValueSchemaId(String clusterName, String storeName, String valueSchemaStr) {
6325-
return storeSchemaService.getValueSchemaId(clusterName, storeName, valueSchemaStr);
6325+
return storeSchemaManager.getValueSchemaId(clusterName, storeName, valueSchemaStr);
63266326
}
63276327

63286328
/**
63296329
* @return the derived schema id for the specified store and derived schema.
63306330
*/
63316331
@Override
63326332
public GeneratedSchemaID getDerivedSchemaId(String clusterName, String storeName, String schemaStr) {
6333-
return storeSchemaService.getDerivedSchemaId(clusterName, storeName, schemaStr);
6333+
return storeSchemaManager.getDerivedSchemaId(clusterName, storeName, schemaStr);
63346334
}
63356335

63366336
/**
63376337
* @return the derived schema for the specified store and id.
63386338
*/
63396339
@Override
63406340
public SchemaEntry getValueSchema(String clusterName, String storeName, int id) {
6341-
return storeSchemaService.getValueSchema(clusterName, storeName, id);
6341+
return storeSchemaManager.getValueSchema(clusterName, storeName, id);
63426342
}
63436343

63446344
/**
@@ -6350,7 +6350,7 @@ public SchemaEntry addValueSchema(
63506350
String storeName,
63516351
String valueSchemaStr,
63526352
DirectionalSchemaCompatibilityType expectedCompatibilityType) {
6353-
return storeSchemaService.addValueSchema(clusterName, storeName, valueSchemaStr, expectedCompatibilityType);
6353+
return storeSchemaManager.addValueSchema(clusterName, storeName, valueSchemaStr, expectedCompatibilityType);
63546354
}
63556355

63566356
/**
@@ -6365,7 +6365,7 @@ public SchemaEntry addValueSchema(
63656365
String valueSchemaStr,
63666366
int schemaId,
63676367
DirectionalSchemaCompatibilityType compatibilityType) {
6368-
return storeSchemaService.addValueSchema(clusterName, storeName, valueSchemaStr, schemaId, compatibilityType);
6368+
return storeSchemaManager.addValueSchema(clusterName, storeName, valueSchemaStr, schemaId, compatibilityType);
63696369
}
63706370

63716371
/**
@@ -6379,7 +6379,7 @@ public DerivedSchemaEntry addDerivedSchema(
63796379
String storeName,
63806380
int valueSchemaId,
63816381
String derivedSchemaStr) {
6382-
return storeSchemaService.addDerivedSchema(clusterName, storeName, valueSchemaId, derivedSchemaStr);
6382+
return storeSchemaManager.addDerivedSchema(clusterName, storeName, valueSchemaId, derivedSchemaStr);
63836383
}
63846384

63856385
/**
@@ -6393,7 +6393,7 @@ public DerivedSchemaEntry addDerivedSchema(
63936393
int valueSchemaId,
63946394
int derivedSchemaId,
63956395
String derivedSchemaStr) {
6396-
return storeSchemaService
6396+
return storeSchemaManager
63976397
.addDerivedSchema(clusterName, storeName, valueSchemaId, derivedSchemaId, derivedSchemaStr);
63986398
}
63996399

@@ -6406,7 +6406,7 @@ public DerivedSchemaEntry removeDerivedSchema(
64066406
String storeName,
64076407
int valueSchemaId,
64086408
int derivedSchemaId) {
6409-
return storeSchemaService.removeDerivedSchema(clusterName, storeName, valueSchemaId, derivedSchemaId);
6409+
return storeSchemaManager.removeDerivedSchema(clusterName, storeName, valueSchemaId, derivedSchemaId);
64106410
}
64116411

64126412
/**
@@ -6423,7 +6423,7 @@ public SchemaEntry addSupersetSchema(
64236423
int valueSchemaId,
64246424
String supersetSchemaStr,
64256425
int supersetSchemaId) {
6426-
return storeSchemaService
6426+
return storeSchemaManager
64276427
.addSupersetSchema(clusterName, storeName, valueSchema, valueSchemaId, supersetSchemaStr, supersetSchemaId);
64286428
}
64296429

@@ -6432,7 +6432,7 @@ public SchemaEntry addSupersetSchema(
64326432
*/
64336433
@Override
64346434
public Collection<RmdSchemaEntry> getReplicationMetadataSchemas(String clusterName, String storeName) {
6435-
return storeSchemaService.getReplicationMetadataSchemas(clusterName, storeName);
6435+
return storeSchemaManager.getReplicationMetadataSchemas(clusterName, storeName);
64366436
}
64376437

64386438
/**
@@ -6447,7 +6447,7 @@ public RmdSchemaEntry addReplicationMetadataSchema(
64476447
int valueSchemaId,
64486448
int replicationMetadataVersionId,
64496449
String replicationMetadataSchemaStr) {
6450-
return storeSchemaService.addReplicationMetadataSchema(
6450+
return storeSchemaManager.addReplicationMetadataSchema(
64516451
clusterName,
64526452
storeName,
64536453
valueSchemaId,
@@ -6565,8 +6565,8 @@ public Map<String, String> getStorageNodesStatus(String clusterName, boolean ena
65656565
return instancesStatusesMap;
65666566
}
65676567

6568-
StoreSchemaService getStoreSchemaService() {
6569-
return storeSchemaService;
6568+
StoreSchemaManager getStoreSchemaManager() {
6569+
return storeSchemaManager;
65706570
}
65716571

65726572
/**

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ public VeniceParentHelixAdmin(
475475

476476
this.parentSchemaOrchestrator = new ParentSchemaOrchestrator(
477477
this,
478-
veniceHelixAdmin.getStoreSchemaService(),
478+
veniceHelixAdmin.getStoreSchemaManager(),
479479
writeComputeSchemaConverter,
480480
externalSupersetSchemaGenerator);
481481
Class<IdentityParser> identityParserClass =
@@ -949,7 +949,7 @@ public void createStore(
949949
Optional<String> accessPermissions) {
950950
acquireAdminMessageLock(clusterName, storeName);
951951
try {
952-
valueSchema = getVeniceHelixAdmin().getStoreSchemaService()
952+
valueSchema = getVeniceHelixAdmin().getStoreSchemaManager()
953953
.normalizeSchemaForMigration(clusterName, storeName, valueSchema);
954954
getVeniceHelixAdmin()
955955
.checkPreConditionForCreateStore(clusterName, storeName, keySchema, valueSchema, isSystemStore, false);

services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class AbstractTestVeniceParentHelixAdmin {
7171

7272
TopicManager topicManager;
7373
VeniceHelixAdmin internalAdmin;
74-
StoreSchemaService storeSchemaService;
74+
StoreSchemaManager storeSchemaManager;
7575
VeniceControllerClusterConfig config;
7676
ZkClient zkClient;
7777
VeniceWriter veniceWriter;
@@ -96,15 +96,15 @@ public void setupInternalMocks() {
9696
doReturn(true).when(topicManager).containsTopicAndAllPartitionsAreOnline(pubSubTopicRepository.getTopic(topicName));
9797

9898
internalAdmin = mock(VeniceHelixAdmin.class);
99-
storeSchemaService = mock(StoreSchemaService.class);
100-
doReturn(storeSchemaService).when(internalAdmin).getStoreSchemaService();
99+
storeSchemaManager = mock(StoreSchemaManager.class);
100+
doReturn(storeSchemaManager).when(internalAdmin).getStoreSchemaManager();
101101
doReturn(topicManager).when(internalAdmin).getTopicManager();
102102
SchemaEntry mockEntry = new SchemaEntry(0, TEST_SCHEMA);
103103
doReturn(mockEntry).when(internalAdmin).getKeySchema(anyString(), anyString());
104104
// Outside a store-migration context, schema normalization is a passthrough. Mirror that here so
105-
// the mocked schema service doesn't return null and blank out the value schema during createStore/
105+
// the mocked schema manager doesn't return null and blank out the value schema during createStore/
106106
// addValueSchema.
107-
when(storeSchemaService.normalizeSchemaForMigration(anyString(), anyString(), any()))
107+
when(storeSchemaManager.normalizeSchemaForMigration(anyString(), anyString(), any()))
108108
.thenAnswer(invocation -> invocation.getArgument(2));
109109

110110
zkClient = mock(ZkClient.class);

0 commit comments

Comments
 (0)