Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/actions/integration-test-run/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,31 @@ inputs:
shard:
description: 'Shard number (e.g., 1, 2, 99)'
required: true
max-jitter-seconds:
description: 'Upper bound for the random start jitter that de-correlates shard setup (git fetch, dependency resolution). Set to 0 to disable.'
required: false
default: '30'

runs:
using: 'composite'
steps:
- name: Jitter start time of the jobs not to stampede CI system and build infrastructure
shell: bash
env:
MAX_JITTER_SECONDS: ${{ inputs.max-jitter-seconds }}
run: |
max_jitter_seconds="$MAX_JITTER_SECONDS"

if ! [[ "$max_jitter_seconds" =~ ^[0-9]+$ ]]; then
echo "Invalid max jitter window: $max_jitter_seconds"
exit 1
fi

if [ "$max_jitter_seconds" -gt 0 ]; then
jitter_seconds=$(( RANDOM % (max_jitter_seconds + 1) ))
echo "Jittering shard start by ${jitter_seconds}s"
sleep "$jitter_seconds"
fi
- uses: ./.github/actions/gradle-test-setup
- name: Run Integration Tests
shell: bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,99 @@ public void testEnableActiveActiveReplicationSchema() {
}
}

/**
* Pins down the RMD-schema handling for the {@code VeniceParentHelixAdmin#addValueSchema} branch where the generated
* superset equals the EXISTING superset (i.e. {@code compareSchema(newSuperSetSchema, existingValueSchema)} is true
* and {@code doUpdateSupersetSchemaID = false}).
*
* <p>Scenario:
* <ul>
* <li>value schema 1: {@code {f0, f1}}</li>
* <li>value schema 2: {@code {f0, f1, f2}} — a strict superset of v1, so it becomes the store's superset (id 2)</li>
* <li>value schema 3: {@code {f0}} — a subset of the superset, so the generated superset equals the existing
* superset and the superset id is NOT bumped</li>
* </ul>
*
* <p>{@link #testEnableActiveActiveReplicationSchema()} establishes the contract that every value schema of an
* active-active store gets its own RMD schema. When value schema 3 is registered, the controller updates the RMD
* schema for {@code getSupersetOrLatestValueSchema} (the existing superset, id 2) instead of the newly added value
* schema (id 3), so value schema 3 never gets an RMD schema. This test asserts the contract and therefore fails on
* that buggy behavior.
*/
@Test(timeOut = TEST_TIMEOUT)
public void testReplicationMetadataSchemaForSubsetValueSchemaOfActiveActiveStore() {
String clusterName = CLUSTER_NAMES[0];
String storeName = Utils.getUniqueString("aa_subset_schema_store");
String valueSchemaV1Str = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":["
+ "{\"name\":\"f0\",\"type\":\"int\",\"default\":0},"
+ "{\"name\":\"f1\",\"type\":\"string\",\"default\":\"\"}]}";
// Strict superset of V1 (adds f2). Becomes the store's superset schema (value schema id 2).
String valueSchemaV2Str = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":["
+ "{\"name\":\"f0\",\"type\":\"int\",\"default\":0},"
+ "{\"name\":\"f1\",\"type\":\"string\",\"default\":\"\"},"
+ "{\"name\":\"f2\",\"type\":\"int\",\"default\":0}]}";
// Subset of the superset (only f0). Its generated superset equals the existing superset, so the superset id is
// NOT bumped when this schema (value schema id 3) is registered.
String valueSchemaV3Str = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":["
+ "{\"name\":\"f0\",\"type\":\"int\",\"default\":0}]}";
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
try (ControllerClient parentControllerClient = new ControllerClient(clusterName, parentControllerURLs);
ControllerClient dc0Client =
new ControllerClient(clusterName, childDatacenters.get(0).getControllerConnectString())) {
// Create the store with value schema 1.
NewStoreResponse newStoreResponse = parentControllerClient
.retryableRequest(5, c -> c.createNewStore(storeName, "", "\"string\"", valueSchemaV1Str));
Assert.assertFalse(newStoreResponse.isError(), "createNewStore failed: " + newStoreResponse.getError());

// Enable active-active replication (so RMD schemas are maintained) and read computation (so the controller
// maintains a superset schema).
UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams().setNativeReplicationEnabled(true)
.setActiveActiveReplicationEnabled(true)
.setReadComputationEnabled(true);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreParams);

// Register the superset schema (value schema 2). It becomes the store's superset (value schema id 2).
SchemaResponse addV2Response =
parentControllerClient.retryableRequest(5, c -> c.addValueSchema(storeName, valueSchemaV2Str));
Assert.assertFalse(addV2Response.isError(), "addValueSchema(V2) failed: " + addV2Response.getError());

// Register the subset schema (value schema 3). Its generated superset equals the existing superset.
SchemaResponse addV3Response =
parentControllerClient.retryableRequest(5, c -> c.addValueSchema(storeName, valueSchemaV3Str));
Assert.assertFalse(addV3Response.isError(), "addValueSchema(V3) failed: " + addV3Response.getError());

Admin veniceHelixAdmin = childDatacenters.get(0).getControllers().values().iterator().next().getVeniceAdmin();

// Sanity check: three value schemas exist and the superset id stayed at 2 (i.e. registering the subset schema
// did NOT bump the superset), confirming we exercised the compareSchema(newSuperSet, existingValueSchema) branch.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, true, () -> {
StoreResponse storeResponse = dc0Client.getStore(storeName);
Assert.assertFalse(storeResponse.isError());
StoreInfo storeInfo = storeResponse.getStore();
Assert.assertTrue(storeInfo.isActiveActiveReplicationEnabled());
assertEquals(storeInfo.getLatestSuperSetValueSchemaId(), 2, "Superset schema id should remain unchanged");
assertEquals(dc0Client.getAllValueSchema(storeName).getSchemas().length, 3, "There should be 3 value schemas");
});

// Every value schema of an active-active store should have its own RMD schema (the contract asserted by
// testEnableActiveActiveReplicationSchema). Verify the newly added subset value schema (id 3).
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, true, () -> {
Collection<RmdSchemaEntry> replicationMetadataSchemas =
veniceHelixAdmin.getReplicationMetadataSchemas(clusterName, storeName);
List<Integer> rmdValueSchemaIds = new ArrayList<>();
for (RmdSchemaEntry rmdSchemaEntry: replicationMetadataSchemas) {
rmdValueSchemaIds.add(rmdSchemaEntry.getValueSchemaID());
}
Assert.assertTrue(
rmdValueSchemaIds.contains(3),
"Active-active store is missing an RMD schema for value schema id 3 (the subset schema). RMD schemas "
+ "exist only for value schema ids " + rmdValueSchemaIds
+ ". The controller updated RMD for the existing superset (id 2) instead of the newly added "
+ "value schema (id 3).");
});
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testStoreRollbackToBackupVersion() {
String clusterName = CLUSTER_NAMES[0];
Expand Down
Loading
Loading