From c6319ba75f2a6fd1ba57a8b48bc8b3dc23bc3960 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Thu, 25 Jan 2024 19:14:26 -0800 Subject: [PATCH] [controller] Turn off A/A and inc push when turning off hybrid store config (#824) This PR adjusts the existing behavior that throws exception when trying to turn off hybrid on an inc push enabled store. This was added 1.5 year ago as part of tactical fix to prevent store to lose RT right away especially for inc push store. Based on my code reading, we will not delete RT right away, if there is active hybrid version. If that is true, the change should be considered safe. The new behavior is: When disabling hybrid store config: If user also try to enable A/A or inc push together, we should fail loudly. Otherwise, we will disable A/A and inc push store config. --- .../controller/VeniceParentHelixAdmin.java | 37 +++++++---- .../AbstractTestVeniceParentHelixAdmin.java | 3 + .../TestVeniceParentHelixAdmin.java | 62 +++++++++++++++++++ 3 files changed, 90 insertions(+), 12 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index fdc43e6e779..38256dfa4b8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -2313,7 +2313,20 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa // Check if the store is being converted to a hybrid store boolean storeBeingConvertedToHybrid = !currStore.isHybrid() && updatedHybridStoreConfig != null && veniceHelixAdmin.isHybrid(updatedHybridStoreConfig); - + // Check if the store is being converted to a batch store + boolean storeBeingConvertedToBatch = currStore.isHybrid() && !veniceHelixAdmin.isHybrid(updatedHybridStoreConfig); + if (storeBeingConvertedToBatch && activeActiveReplicationEnabled.orElse(false)) { + throw new VeniceHttpException( + HttpStatus.SC_BAD_REQUEST, + "Cannot convert store to batch-only and enable Active/Active together.", + ErrorType.BAD_REQUEST); + } + if (storeBeingConvertedToBatch && incrementalPushEnabled.orElse(false)) { + throw new VeniceHttpException( + HttpStatus.SC_BAD_REQUEST, + "Cannot convert store to batch-only and enable incremental push together.", + ErrorType.BAD_REQUEST); + } // Update active-active replication config. setStore.activeActiveReplicationEnabled = activeActiveReplicationEnabled .map(addToUpdatedConfigList(updatedConfigsList, ACTIVE_ACTIVE_REPLICATION_ENABLED)) @@ -2325,6 +2338,12 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa setStore.activeActiveReplicationEnabled = true; updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED); } + // When turning off hybrid store, we will also turn off A/A store config. + if (storeBeingConvertedToBatch && setStore.activeActiveReplicationEnabled) { + setStore.activeActiveReplicationEnabled = false; + updatedConfigsList.add(ACTIVE_ACTIVE_REPLICATION_ENABLED); + } + // Update incremental push config. setStore.incrementalPushEnabled = incrementalPushEnabled.map(addToUpdatedConfigList(updatedConfigsList, INCREMENTAL_PUSH_ENABLED)) @@ -2337,18 +2356,12 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa setStore.incrementalPushEnabled = true; updatedConfigsList.add(INCREMENTAL_PUSH_ENABLED); } - - // If store is already hybrid then check to make sure the end state is valid. We do this because we allow enabling - // incremental push without enabling hybrid already (we will automatically convert to hybrid store with default - // configs). - if (veniceHelixAdmin.isHybrid(currStore.getHybridStoreConfig()) - && !veniceHelixAdmin.isHybrid(updatedHybridStoreConfig) && setStore.incrementalPushEnabled) { - throw new VeniceHttpException( - HttpStatus.SC_BAD_REQUEST, - "Cannot convert store to batch-only, incremental push enabled stores require valid hybrid configs. " - + "Please disable incremental push if you'd like to convert the store to batch-only", - ErrorType.BAD_REQUEST); + // When turning off hybrid store, we will also turn off incremental store config. + if (storeBeingConvertedToBatch && setStore.incrementalPushEnabled) { + setStore.incrementalPushEnabled = false; + updatedConfigsList.add(INCREMENTAL_PUSH_ENABLED); } + if (updatedHybridStoreConfig == null) { setStore.hybridStoreConfig = null; } else { diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java index 1865b2a5e46..33611e57c4b 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.linkedin.venice.authorization.AuthorizerService; import com.linkedin.venice.authorization.DefaultIdentityParser; @@ -22,6 +23,7 @@ import com.linkedin.venice.helix.ZkRoutersClusterManager; import com.linkedin.venice.helix.ZkStoreConfigAccessor; import com.linkedin.venice.kafka.TopicManager; +import com.linkedin.venice.meta.HybridStoreConfig; import com.linkedin.venice.meta.OfflinePushStrategy; import com.linkedin.venice.meta.Store; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -84,6 +86,7 @@ public void setupInternalMocks() { doReturn(true).when(topicManager).containsTopicAndAllPartitionsAreOnline(pubSubTopicRepository.getTopic(topicName)); internalAdmin = mock(VeniceHelixAdmin.class); + when(internalAdmin.isHybrid((HybridStoreConfig) any())).thenCallRealMethod(); doReturn(topicManager).when(internalAdmin).getTopicManager(); SchemaEntry mockEntry = new SchemaEntry(0, TEST_SCHEMA); doReturn(mockEntry).when(internalAdmin).getKeySchema(anyString(), anyString()); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index 926b5337442..bd4dd318fdd 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -1846,6 +1846,68 @@ public void testUpdateStoreNativeReplicationSourceFabric() { "Native replication source fabric does not match after updating the store!"); } + @Test + public void testDisableHybridConfigWhenActiveActiveOrIncPushConfigIsEnabled() { + String storeName = Utils.getUniqueString("testUpdateStore"); + Store store = TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()); + + store.setHybridStoreConfig( + new HybridStoreConfigImpl( + 1000, + 100, + -1, + DataReplicationPolicy.NON_AGGREGATE, + BufferReplayPolicy.REWIND_FROM_EOP)); + store.setActiveActiveReplicationEnabled(true); + store.setIncrementalPushEnabled(true); + store.setNativeReplicationEnabled(true); + store.setChunkingEnabled(true); + doReturn(store).when(internalAdmin).getStore(clusterName, storeName); + + doReturn(CompletableFuture.completedFuture(new SimplePubSubProduceResultImpl(topicName, partitionId, 1, -1))) + .when(veniceWriter) + .put(any(), any(), anyInt()); + + when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) + .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + + parentAdmin.initStorageCluster(clusterName); + // When user disable hybrid but also try to manually turn on A/A or Incremental Push, update operation should fail + // loudly. + Assert.assertThrows( + () -> parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setHybridRewindSeconds(-1) + .setHybridOffsetLagThreshold(-1) + .setActiveActiveReplicationEnabled(true))); + Assert.assertThrows( + () -> parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setHybridRewindSeconds(-1) + .setHybridOffsetLagThreshold(-1) + .setIncrementalPushEnabled(true))); + + parentAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setHybridOffsetLagThreshold(-1).setHybridRewindSeconds(-1)); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor valueCaptor = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor schemaCaptor = ArgumentCaptor.forClass(Integer.class); + + verify(veniceWriter, times(1)).put(keyCaptor.capture(), valueCaptor.capture(), schemaCaptor.capture()); + byte[] valueBytes = valueCaptor.getValue(); + int schemaId = schemaCaptor.getValue(); + AdminOperation adminMessage = adminOperationSerializer.deserialize(ByteBuffer.wrap(valueBytes), schemaId); + UpdateStore updateStore = (UpdateStore) adminMessage.payloadUnion; + Assert.assertFalse(internalAdmin.isHybrid(updateStore.getHybridStoreConfig())); + Assert.assertFalse(updateStore.incrementalPushEnabled); + Assert.assertFalse(updateStore.activeActiveReplicationEnabled); + } + @Test public void testSetStoreViewConfig() { String storeName = Utils.getUniqueString("testUpdateStore");