Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,99 @@ public void testEnableActiveActiveReplicationSchema() {
}
}

/**
* Pins down the RMD-schema handling for the {@code VeniceParentHelixAdmin#addValueSchema} branch where the generated
* superset equals the EXISTING superset (i.e. {@code compareSchema(newSuperSetSchema, existingValueSchema)} is true
* and {@code doUpdateSupersetSchemaID = false}).
*
* <p>Scenario:
* <ul>
* <li>value schema 1: {@code {f0, f1}}</li>
* <li>value schema 2: {@code {f0, f1, f2}} — a strict superset of v1, so it becomes the store's superset (id 2)</li>
* <li>value schema 3: {@code {f0}} — a subset of the superset, so the generated superset equals the existing
* superset and the superset id is NOT bumped</li>
* </ul>
*
* <p>{@link #testEnableActiveActiveReplicationSchema()} establishes the contract that every value schema of an
* active-active store gets its own RMD schema. When value schema 3 is registered, the controller updates the RMD
* schema for {@code getSupersetOrLatestValueSchema} (the existing superset, id 2) instead of the newly added value
* schema (id 3), so value schema 3 never gets an RMD schema. This test asserts the contract and therefore fails on
* that buggy behavior.
*/
@Test(timeOut = TEST_TIMEOUT)
public void testReplicationMetadataSchemaForSubsetValueSchemaOfActiveActiveStore() {
String clusterName = CLUSTER_NAMES[0];
String storeName = Utils.getUniqueString("aa_subset_schema_store");
String valueSchemaV1Str = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":["
+ "{\"name\":\"f0\",\"type\":\"int\",\"default\":0},"
+ "{\"name\":\"f1\",\"type\":\"string\",\"default\":\"\"}]}";
// Strict superset of V1 (adds f2). Becomes the store's superset schema (value schema id 2).
String valueSchemaV2Str = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":["
+ "{\"name\":\"f0\",\"type\":\"int\",\"default\":0},"
+ "{\"name\":\"f1\",\"type\":\"string\",\"default\":\"\"},"
+ "{\"name\":\"f2\",\"type\":\"int\",\"default\":0}]}";
// Subset of the superset (only f0). Its generated superset equals the existing superset, so the superset id is
// NOT bumped when this schema (value schema id 3) is registered.
String valueSchemaV3Str = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":["
+ "{\"name\":\"f0\",\"type\":\"int\",\"default\":0}]}";
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
try (ControllerClient parentControllerClient = new ControllerClient(clusterName, parentControllerURLs);
ControllerClient dc0Client =
new ControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString())) {
// Create the store with value schema 1.
NewStoreResponse newStoreResponse = parentControllerClient
.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", valueSchemaV1Str));
Assert.assertFalse(newStoreResponse.isError(), "createNewStore failed: " + newStoreResponse.getError());

// Enable active-active replication (so RMD schemas are maintained) and read computation (so the controller
// maintains a superset schema).
UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams().setNativeReplicationEnabled(true)
.setActiveActiveReplicationEnabled(true)
.setReadComputationEnabled(true);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams);

// Register the superset schema (value schema 2). It becomes the store's superset (value schema id 2).
SchemaResponse addV2Response =
parentControllerClient.retryableRequest(5, c -> c.addValueSchema(storeName, valueSchemaV2Str));
Assert.assertFalse(addV2Response.isError(), "addValueSchema(V2) failed: " + addV2Response.getError());

// Register the subset schema (value schema 3). Its generated superset equals the existing superset.
SchemaResponse addV3Response =
parentControllerClient.retryableRequest(5, c -> c.addValueSchema(storeName, valueSchemaV3Str));
Assert.assertFalse(addV3Response.isError(), "addValueSchema(V3) failed: " + addV3Response.getError());

Admin veniceHelixAdmin = childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin();

// Sanity check: three value schemas exist and the superset id stayed at 2 (i.e. registering the subset schema
// did NOT bump the superset), confirming we exercised the compareSchema(newSuperSet, existingValueSchema) branch.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, true, () -> {
StoreResponse storeResponse = dc0Client.getStore(storeName);
Assert.assertFalse(storeResponse.isError());
StoreInfo storeInfo = storeResponse.getStore();
Assert.assertTrue(storeInfo.isActiveActiveReplicationEnabled());
assertEquals(storeInfo.getLatestSuperSetValueSchemaId(), 2, "Superset schema id should remain unchanged");
assertEquals(dc0Client.getAllValueSchema(storeName).getSchemas().length, 3, "There should be 3 value schemas");
});

// Every value schema of an active-active store should have its own RMD schema (the contract asserted by
// testEnableActiveActiveReplicationSchema). Verify the newly added subset value schema (id 3).
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, true, () -> {
Collection<RmdSchemaEntry> replicationMetadataSchemas =
veniceHelixAdmin.getReplicationMetadataSchemas(clusterName, storeName);
List<Integer> rmdValueSchemaIds = new ArrayList<>();
for (RmdSchemaEntry rmdSchemaEntry: replicationMetadataSchemas) {
rmdValueSchemaIds.add(rmdSchemaEntry.getValueSchemaID());
}
Assert.assertTrue(
rmdValueSchemaIds.contains(3),
"Active-active store is missing an RMD schema for value schema id 3 (the subset schema). RMD schemas "
+ "exist only for value schema ids " + rmdValueSchemaIds
+ ". The controller updated RMD for the existing superset (id 2) instead of the newly added "
+ "value schema (id 3).");
});
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testStoreRollbackToBackupVersion() {
String clusterName = CLUSTER_NAMES[0];
Expand Down
Loading
Loading