diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index fdc43e6e779..38256dfa4b8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -2313,7 +2313,20 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa // Check if the store is being converted to a hybrid store boolean storeBeingConvertedToHybrid = !currStore.isHybrid() && updatedHybridStoreConfig != null && veniceHelixAdmin.isHybrid(updatedHybridStoreConfig); - + // Check if the store is being converted to a batch store + boolean storeBeingConvertedToBatch = currStore.isHybrid() && !veniceHelixAdmin.isHybrid(updatedHybridStoreConfig); + if (storeBeingConvertedToBatch && activeActiveReplicationEnabled.orElse(false)) { + throw new VeniceHttpException( + HttpStatus.SC_BAD_REQUEST, + "Cannot convert store to batch-only and enable Active/Active together.", + ErrorType.BAD_REQUEST); + } + if (storeBeingConvertedToBatch && incrementalPushEnabled.orElse(false)) { + throw new VeniceHttpException( + HttpStatus.SC_BAD_REQUEST, + "Cannot convert store to batch-only and enable incremental push together.", + ErrorType.BAD_REQUEST); + } // Update active-active replication config. setStore.activeActiveReplicationEnabled = activeActiveReplicationEnabled .map(addToUpdatedConfigList(updatedConfigsList, ACTIVE_ACTIVE_REPLICATION_ENABLED)) @@ -2325,6 +2338,12 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa setStore.activeActiveReplicationEnabled = true; updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED); } + // When turning off hybrid store, we will also turn off A/A store config. + if (storeBeingConvertedToBatch && setStore.activeActiveReplicationEnabled) { + setStore.activeActiveReplicationEnabled = false; + updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED); + } + // Update incremental push config. setStore.incrementalPushEnabled = incrementalPushEnabled.map(addToUpdatedConfigList(updatedConfigsList, INCREMENTAL_PUSH_ENABLED)) @@ -2337,18 +2356,12 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa setStore.incrementalPushEnabled = true; updatedConfigsList.add(INCREMENTAL_PUSH_ENABLED); } - - // If store is already hybrid then check to make sure the end state is valid. We do this because we allow enabling - // incremental push without enabling hybrid already (we will automatically convert to hybrid store with default - // configs). - if (veniceHelixAdmin.isHybrid(currStore.getHybridStoreConfig()) - && !veniceHelixAdmin.isHybrid(updatedHybridStoreConfig) && setStore.incrementalPushEnabled) { - throw new VeniceHttpException( - HttpStatus.SC_BAD_REQUEST, - "Cannot convert store to batch-only, incremental push enabled stores require valid hybrid configs. " - + "Please disable incremental push if you'd like to convert the store to batch-only", - ErrorType.BAD_REQUEST); + // When turning off hybrid store, we will also turn off incremental store config. + if (storeBeingConvertedToBatch && setStore.incrementalPushEnabled) { + setStore.incrementalPushEnabled = false; + updatedConfigsList.add(INCREMENTAL_PUSH_ENABLED); } + if (updatedHybridStoreConfig == null) { setStore.hybridStoreConfig = null; } else { diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java index 1865b2a5e46..33611e57c4b 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.linkedin.venice.authorization.AuthorizerService; import com.linkedin.venice.authorization.DefaultIdentityParser; @@ -22,6 +23,7 @@ import com.linkedin.venice.helix.ZkRoutersClusterManager; import com.linkedin.venice.helix.ZkStoreConfigAccessor; import com.linkedin.venice.kafka.TopicManager; +import com.linkedin.venice.meta.HybridStoreConfig; import com.linkedin.venice.meta.OfflinePushStrategy; import com.linkedin.venice.meta.Store; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -84,6 +86,7 @@ public void setupInternalMocks() { doReturn(true).when(topicManager).containsTopicAndAllPartitionsAreOnline(pubSubTopicRepository.getTopic(topicName)); internalAdmin = mock(VeniceHelixAdmin.class); + when(internalAdmin.isHybrid((HybridStoreConfig) any())).thenCallRealMethod(); doReturn(topicManager).when(internalAdmin).getTopicManager(); SchemaEntry mockEntry = new SchemaEntry(0, TEST_SCHEMA); doReturn(mockEntry).when(internalAdmin).getKeySchema(anyString(), anyString()); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index 926b5337442..bd4dd318fdd 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -1846,6 +1846,68 @@ public void testUpdateStoreNativeReplicationSourceFabric() { "Native replication source fabric does not match after updating the store!"); } + @Test + public void testDisableHybridConfigWhenActiveActiveOrIncPushConfigIsEnabled() { + String storeName = Utils.getUniqueString("testUpdateStore"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + + store.setHybridStoreConfig( + new HybridStoreConfigImpl( + 1000, + 100, + -1, + DataReplicationPolicy.NON_AGGREGATE, + BufferReplayPolicy.REWIND_FROM_EOP)); + store.setActiveActiveReplicationEnabled(true); + store.setIncrementalPushEnabled(true); + store.setNativeReplicationEnabled(true); + store.setChunkingEnabled(true); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1))) + .when(veniceWriter) + .put(any(), any(), anyInt()); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + + parentAdmin.initStorageCluster(clusterName); + // When user disable hybrid but also try to manually turn on A/A or Incremental Push, update operation should fail + // loudly. + Assert.assertThrows( + () -> parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setHybridRewindSeconds(-1) + .setHybridOffsetLagThreshold(-1) + .setActiveActiveReplicationEnabled(true))); + Assert.assertThrows( + () -> parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setHybridRewindSeconds(-1) + .setHybridOffsetLagThreshold(-1) + .setIncrementalPushEnabled(true))); + + parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setHybridOffsetLagThreshold(-1).setHybridRewindSeconds(-1)); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Integer.class); + + verify(veniceWriter, times(1)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + byte[] valueBytes = valueCaptor.getValue(); + int schemaId = schemaCaptor.getValue(); + AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + Assert.assertFalse(internalAdmin.isHybrid(updateStore.getHybridStoreConfig())); + Assert.assertFalse(updateStore.incrementalPushEnabled); + Assert.assertFalse(updateStore.activeActiveReplicationEnabled); + } + @Test public void testSetStoreViewConfig() { String storeName = Utils.getUniqueString("testUpdateStore");