|
3 | 3 | import static com.linkedin.venice.ConfigKeys.ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY;
|
4 | 4 | import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_BATCH_ONLY_STORES;
|
5 | 5 | import static com.linkedin.venice.ConfigKeys.NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_HYBRID_STORES;
|
6 |
| -import static com.linkedin.venice.controller.VeniceHelixAdmin.VERSION_ID_UNSET; |
7 |
| -import static org.mockito.ArgumentMatchers.any; |
8 |
| -import static org.mockito.ArgumentMatchers.anyString; |
9 |
| -import static org.mockito.Mockito.doReturn; |
10 |
| -import static org.mockito.Mockito.mock; |
| 6 | +import static com.linkedin.venice.utils.TestUtils.assertCommand; |
| 7 | +import static org.testng.Assert.assertEquals; |
| 8 | +import static org.testng.Assert.assertTrue; |
11 | 9 |
|
| 10 | +import com.linkedin.venice.controllerapi.ControllerClient; |
12 | 11 | import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
|
13 |
| -import com.linkedin.venice.meta.Version; |
14 |
| -import com.linkedin.venice.pubsub.manager.TopicManager; |
15 |
| -import com.linkedin.venice.pubsub.manager.TopicManagerRepository; |
| 12 | +import com.linkedin.venice.integration.utils.ServiceFactory; |
| 13 | +import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions; |
| 14 | +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; |
| 15 | +import com.linkedin.venice.meta.StoreInfo; |
| 16 | +import com.linkedin.venice.utils.TestUtils; |
| 17 | +import com.linkedin.venice.utils.Time; |
16 | 18 | import com.linkedin.venice.utils.Utils;
|
17 |
| -import java.io.IOException; |
18 |
| -import java.util.Optional; |
19 | 19 | import java.util.Properties;
|
| 20 | +import java.util.concurrent.TimeUnit; |
20 | 21 | import org.testng.Assert;
|
21 | 22 | import org.testng.annotations.AfterClass;
|
22 | 23 | import org.testng.annotations.BeforeClass;
|
23 | 24 | import org.testng.annotations.Test;
|
24 | 25 |
|
25 | 26 |
|
26 |
| -public class TestClusterLevelConfigForNativeReplication extends AbstractTestVeniceHelixAdmin { |
| 27 | +public class TestClusterLevelConfigForNativeReplication { |
| 28 | + private static final long TEST_TIMEOUT = 60 * Time.MS_PER_SECOND; |
| 29 | + |
| 30 | + private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper; |
| 31 | + private ControllerClient parentControllerClient; |
| 32 | + |
27 | 33 | @BeforeClass(alwaysRun = true)
|
28 |
| - public void setUp() throws Exception { |
29 |
| - setupCluster(); |
| 34 | + public void setUp() { |
| 35 | + Utils.thisIsLocalhost(); |
| 36 | + Properties parentControllerProps = new Properties(); |
| 37 | + // enable native replication for batch-only stores through cluster-level config |
| 38 | + parentControllerProps.setProperty(ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY, "true"); |
| 39 | + parentControllerProps.setProperty(NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_BATCH_ONLY_STORES, "dc-batch"); |
| 40 | + parentControllerProps.setProperty(NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_HYBRID_STORES, "dc-hybrid"); |
| 41 | + multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( |
| 42 | + new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(1) |
| 43 | + .numberOfParentControllers(1) |
| 44 | + .numberOfChildControllers(1) |
| 45 | + .numberOfRouters(1) |
| 46 | + .numberOfServers(1) |
| 47 | + .parentControllerProperties(parentControllerProps) |
| 48 | + .build()); |
| 49 | + parentControllerClient = new ControllerClient( |
| 50 | + multiRegionMultiClusterWrapper.getClusterNames()[0], |
| 51 | + multiRegionMultiClusterWrapper.getControllerConnectString()); |
30 | 52 | }
|
31 | 53 |
|
32 | 54 | @AfterClass(alwaysRun = true)
|
33 |
| - public void cleanUp() { |
34 |
| - cleanupCluster(); |
35 |
| - } |
36 |
| - |
37 |
| - @Override |
38 |
| - Properties getControllerProperties(String clusterName) throws IOException { |
39 |
| - Properties props = super.getControllerProperties(clusterName); |
40 |
| - // enable native replication for batch-only stores through cluster-level config |
41 |
| - props.setProperty(ENABLE_NATIVE_REPLICATION_AS_DEFAULT_FOR_BATCH_ONLY, "true"); |
42 |
| - props.setProperty(NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_BATCH_ONLY_STORES, "dc-batch"); |
43 |
| - props.setProperty(NATIVE_REPLICATION_SOURCE_FABRIC_AS_DEFAULT_FOR_HYBRID_STORES, "dc-hybrid"); |
44 |
| - return props; |
| 55 | + public void tearDown() { |
| 56 | + Utils.closeQuietlyWithErrorLogged(parentControllerClient); |
| 57 | + Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper); |
45 | 58 | }
|
46 | 59 |
|
47 |
| - @Test |
| 60 | + @Test(timeOut = TEST_TIMEOUT) |
48 | 61 | public void testClusterLevelNativeReplicationConfigForNewStores() {
|
49 |
| - TopicManagerRepository originalTopicManagerRepository = veniceAdmin.getTopicManagerRepository(); |
50 |
| - |
51 |
| - TopicManager mockedTopicManager = mock(TopicManager.class); |
52 |
| - TopicManagerRepository mockedTopicManageRepository = mock(TopicManagerRepository.class); |
53 |
| - doReturn(mockedTopicManager).when(mockedTopicManageRepository).getLocalTopicManager(); |
54 |
| - doReturn(mockedTopicManager).when(mockedTopicManageRepository).getTopicManager(any(String.class)); |
55 |
| - doReturn(mockedTopicManager).when(mockedTopicManageRepository).getTopicManager(anyString()); |
56 |
| - veniceAdmin.setTopicManagerRepository(mockedTopicManageRepository); |
57 | 62 | String storeName = Utils.getUniqueString("test-store");
|
58 | 63 | String pushJobId1 = "test-push-job-id-1";
|
59 |
| - /** |
60 |
| - * Do not enable any store-level config for leader/follower mode or native replication feature. |
61 |
| - */ |
62 |
| - veniceAdmin.createStore(clusterName, storeName, "test-owner", KEY_SCHEMA, VALUE_SCHEMA); |
| 64 | + parentControllerClient.createNewStore(storeName, "test-owner", "\"string\"", "\"string\""); |
| 65 | + parentControllerClient.emptyPush(storeName, pushJobId1, 1); |
63 | 66 |
|
64 |
| - /** |
65 |
| - * Add a version |
66 |
| - */ |
67 |
| - veniceAdmin.addVersionAndTopicOnly( |
68 |
| - clusterName, |
69 |
| - storeName, |
70 |
| - pushJobId1, |
71 |
| - VERSION_ID_UNSET, |
72 |
| - 1, |
73 |
| - 1, |
74 |
| - false, |
75 |
| - true, |
76 |
| - Version.PushType.BATCH, |
77 |
| - null, |
78 |
| - null, |
79 |
| - Optional.empty(), |
80 |
| - -1, |
81 |
| - 1, |
82 |
| - Optional.empty(), |
83 |
| - false); |
84 | 67 | // Version 1 should exist.
|
85 |
| - Assert.assertEquals(veniceAdmin.getStore(clusterName, storeName).getVersions().size(), 1); |
| 68 | + StoreInfo store = assertCommand(parentControllerClient.getStore(storeName)).getStore(); |
| 69 | + assertEquals(store.getVersions().size(), 1); |
86 | 70 | // native replication should be enabled by cluster-level config
|
87 |
| - Assert.assertEquals(veniceAdmin.getStore(clusterName, storeName).isNativeReplicationEnabled(), true); |
88 |
| - Assert.assertEquals(veniceAdmin.getStore(clusterName, storeName).getNativeReplicationSourceFabric(), "dc-batch"); |
89 |
| - veniceAdmin.updateStore( |
90 |
| - clusterName, |
91 |
| - storeName, |
92 |
| - new UpdateStoreQueryParams().setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(1L)); |
93 |
| - Assert.assertEquals(veniceAdmin.getStore(clusterName, storeName).getNativeReplicationSourceFabric(), "dc-hybrid"); |
94 |
| - veniceAdmin.updateStore( |
95 |
| - clusterName, |
96 |
| - storeName, |
97 |
| - new UpdateStoreQueryParams().setHybridRewindSeconds(-1L).setHybridOffsetLagThreshold(-1L)); |
98 |
| - Assert.assertEquals(veniceAdmin.getStore(clusterName, storeName).getNativeReplicationSourceFabric(), "dc-batch"); |
99 |
| - veniceAdmin.updateStore( |
100 |
| - clusterName, |
101 |
| - storeName, |
102 |
| - new UpdateStoreQueryParams().setIncrementalPushEnabled(true) |
103 |
| - .setHybridRewindSeconds(1L) |
104 |
| - .setHybridOffsetLagThreshold(10)); |
105 |
| - Assert.assertEquals(veniceAdmin.getStore(clusterName, storeName).getNativeReplicationSourceFabric(), "dc-hybrid"); |
106 |
| - |
107 |
| - // Set topic original topic manager back |
108 |
| - veniceAdmin.setTopicManagerRepository(originalTopicManagerRepository); |
| 71 | + assertTrue(store.isNativeReplicationEnabled()); |
| 72 | + assertEquals(store.getNativeReplicationSourceFabric(), "dc-batch"); |
| 73 | + assertCommand( |
| 74 | + parentControllerClient.updateStore( |
| 75 | + storeName, |
| 76 | + new UpdateStoreQueryParams().setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(1L))); |
| 77 | + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { |
| 78 | + Assert.assertEquals( |
| 79 | + parentControllerClient.getStore(storeName).getStore().getNativeReplicationSourceFabric(), |
| 80 | + "dc-hybrid"); |
| 81 | + }); |
| 82 | + assertCommand( |
| 83 | + parentControllerClient.updateStore( |
| 84 | + storeName, |
| 85 | + new UpdateStoreQueryParams().setHybridRewindSeconds(-1L).setHybridOffsetLagThreshold(-1L))); |
| 86 | + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { |
| 87 | + Assert.assertEquals( |
| 88 | + parentControllerClient.getStore(storeName).getStore().getNativeReplicationSourceFabric(), |
| 89 | + "dc-batch"); |
| 90 | + }); |
| 91 | + assertCommand( |
| 92 | + parentControllerClient.updateStore( |
| 93 | + storeName, |
| 94 | + new UpdateStoreQueryParams().setIncrementalPushEnabled(true) |
| 95 | + .setHybridRewindSeconds(1L) |
| 96 | + .setHybridOffsetLagThreshold(10))); |
| 97 | + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { |
| 98 | + Assert.assertEquals( |
| 99 | + parentControllerClient.getStore(storeName).getStore().getNativeReplicationSourceFabric(), |
| 100 | + "dc-hybrid"); |
| 101 | + }); |
109 | 102 | }
|
110 | 103 | }
|
0 commit comments