|
| 1 | +package com.linkedin.venice.controller; |
| 2 | + |
| 3 | +import static com.linkedin.venice.ConfigKeys.ADMIN_HELIX_MESSAGING_CHANNEL_ENABLED; |
| 4 | +import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; |
| 5 | +import static com.linkedin.venice.ConfigKeys.CLUSTER_TO_D2; |
| 6 | +import static com.linkedin.venice.ConfigKeys.CLUSTER_TO_SERVER_D2; |
| 7 | +import static com.linkedin.venice.ConfigKeys.CONTROLLER_ADD_VERSION_VIA_ADMIN_PROTOCOL; |
| 8 | +import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_ZK_ADDRESSS; |
| 9 | +import static com.linkedin.venice.ConfigKeys.CONTROLLER_SSL_ENABLED; |
| 10 | +import static com.linkedin.venice.ConfigKeys.CONTROLLER_SYSTEM_SCHEMA_CLUSTER_NAME; |
| 11 | +import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS; |
| 12 | +import static com.linkedin.venice.ConfigKeys.DEFAULT_PARTITION_SIZE; |
| 13 | +import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; |
| 14 | +import static com.linkedin.venice.ConfigKeys.KAFKA_REPLICATION_FACTOR; |
| 15 | +import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_STORE_ENABLED; |
| 16 | +import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; |
| 17 | +import static org.testng.Assert.assertFalse; |
| 18 | +import static org.testng.Assert.assertNotNull; |
| 19 | +import static org.testng.Assert.assertTrue; |
| 20 | + |
| 21 | +import com.linkedin.venice.integration.utils.D2TestUtils; |
| 22 | +import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; |
| 23 | +import com.linkedin.venice.integration.utils.ServiceFactory; |
| 24 | +import com.linkedin.venice.integration.utils.ZkServerWrapper; |
| 25 | +import com.linkedin.venice.pubsub.PubSubTopicRepository; |
| 26 | +import com.linkedin.venice.utils.TestUtils; |
| 27 | +import com.linkedin.venice.utils.Time; |
| 28 | +import com.linkedin.venice.utils.Utils; |
| 29 | +import com.linkedin.venice.utils.VeniceProperties; |
| 30 | +import io.tehuti.metrics.MetricsRepository; |
| 31 | +import java.util.Collections; |
| 32 | +import java.util.List; |
| 33 | +import java.util.Optional; |
| 34 | +import java.util.Properties; |
| 35 | +import java.util.concurrent.TimeUnit; |
| 36 | +import org.apache.helix.manager.zk.ZKHelixAdmin; |
| 37 | +import org.testng.annotations.AfterClass; |
| 38 | +import org.testng.annotations.AfterMethod; |
| 39 | +import org.testng.annotations.BeforeClass; |
| 40 | +import org.testng.annotations.Test; |
| 41 | + |
| 42 | + |
| 43 | +/** |
| 44 | + * Proves that a Venice controller can be pointed at TWO separate ZooKeeper ensembles: |
| 45 | + * <ul> |
| 46 | + * <li>{@code zookeeper.address} (storage ZK) – holds the Venice storage-cluster Helix metadata |
| 47 | + * (storage cluster, its resources, store/version znodes).</li> |
| 48 | + * <li>{@code controller.cluster.zk.address} (controller ZK) – holds the controller-cluster Helix |
| 49 | + * metadata (the {@code venice-controllers} cluster: leader election, participant registration, |
| 50 | + * storage-cluster-to-controller assignment) and the HAAS grand cluster.</li> |
| 51 | + * </ul> |
| 52 | + * |
| 53 | + * This is the "work-backwards" integration test Xun asked for: it runs the controller bring-up once |
| 54 | + * with a SHARED ZK (the default/aliasing path) and once with SPLIT ZK, and asserts via a |
| 55 | + * ZkInspector-style probe ({@link ZKHelixAdmin#getClusters()} against each ensemble) that the |
| 56 | + * controller-cluster znodes and the storage-cluster znodes land on the expected — and only the |
| 57 | + * expected — ensemble. |
| 58 | + * |
| 59 | + * The split-ZK branch in {@link ZkHelixAdminClient} and {@code connectToControllerCluster} in |
| 60 | + * {@link VeniceHelixAdmin} is exercised here for the first time; with |
| 61 | + * {@code controller.cluster.zk.address} defaulting to {@code zookeeper.address}, no existing test |
| 62 | + * ever drives the two addresses apart. |
| 63 | + */ |
| 64 | +public class TestVeniceControllerZkClientIsolation { |
| 65 | + private static final long TIMEOUT_MS = 60 * Time.MS_PER_SECOND; |
| 66 | + |
| 67 | + private ZkServerWrapper storageZk; |
| 68 | + private ZkServerWrapper controllerZk; |
| 69 | + private PubSubBrokerWrapper pubSubBrokerWrapper; |
| 70 | + |
| 71 | + private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); |
| 72 | + |
| 73 | + private VeniceHelixAdmin veniceAdmin; |
| 74 | + private String clusterName; |
| 75 | + private String controllerClusterName; |
| 76 | + |
| 77 | + @BeforeClass |
| 78 | + public void setUp() { |
| 79 | + Utils.thisIsLocalhost(); |
| 80 | + storageZk = ServiceFactory.getZkServer(); |
| 81 | + controllerZk = ServiceFactory.getZkServer(); |
| 82 | + pubSubBrokerWrapper = ServiceFactory.getPubSubBroker(); |
| 83 | + } |
| 84 | + |
| 85 | + @AfterMethod(alwaysRun = true) |
| 86 | + public void tearDownAdmin() { |
| 87 | + Utils.closeQuietlyWithErrorLogged(veniceAdmin); |
| 88 | + veniceAdmin = null; |
| 89 | + } |
| 90 | + |
| 91 | + @AfterClass(alwaysRun = true) |
| 92 | + public void tearDown() { |
| 93 | + Utils.closeQuietlyWithErrorLogged(pubSubBrokerWrapper); |
| 94 | + Utils.closeQuietlyWithErrorLogged(controllerZk); |
| 95 | + Utils.closeQuietlyWithErrorLogged(storageZk); |
| 96 | + } |
| 97 | + |
| 98 | + /** |
| 99 | + * SPLIT ZK: controller-cluster Helix metadata must live ONLY on the controller ZK, and the |
| 100 | + * storage-cluster Helix metadata must live ONLY on the storage ZK. |
| 101 | + */ |
| 102 | + @Test(timeOut = TIMEOUT_MS) |
| 103 | + public void testControllerClusterIsolatedFromStorageZk() { |
| 104 | + bringUpController(/* splitZk */ true); |
| 105 | + |
| 106 | + // Create a store so the storage cluster has real Venice metadata on the storage ZK. |
| 107 | + String storeName = Utils.getUniqueString("isolation-store"); |
| 108 | + veniceAdmin.createStore(clusterName, storeName, "owner", "\"string\"", "\"string\""); |
| 109 | + assertNotNull(veniceAdmin.getStore(clusterName, storeName), "Store should have been created"); |
| 110 | + |
| 111 | + List<String> clustersOnControllerZk = listHelixClusters(controllerZk.getAddress()); |
| 112 | + List<String> clustersOnStorageZk = listHelixClusters(storageZk.getAddress()); |
| 113 | + |
| 114 | + // Controller ZK owns the controller cluster, NOT the storage cluster. |
| 115 | + assertTrue( |
| 116 | + clustersOnControllerZk.contains(controllerClusterName), |
| 117 | + "Controller ZK " + controllerZk.getAddress() + " must hold the controller cluster '" + controllerClusterName |
| 118 | + + "', found: " + clustersOnControllerZk); |
| 119 | + assertFalse( |
| 120 | + clustersOnControllerZk.contains(clusterName), |
| 121 | + "Controller ZK must NOT hold the storage cluster '" + clusterName + "', found: " + clustersOnControllerZk); |
| 122 | + |
| 123 | + // Storage ZK owns the storage cluster, NOT the controller cluster. |
| 124 | + assertTrue( |
| 125 | + clustersOnStorageZk.contains(clusterName), |
| 126 | + "Storage ZK " + storageZk.getAddress() + " must hold the storage cluster '" + clusterName + "', found: " |
| 127 | + + clustersOnStorageZk); |
| 128 | + assertFalse( |
| 129 | + clustersOnStorageZk.contains(controllerClusterName), |
| 130 | + "Storage ZK must NOT hold the controller cluster '" + controllerClusterName + "', found: " |
| 131 | + + clustersOnStorageZk); |
| 132 | + } |
| 133 | + |
| 134 | + /** |
| 135 | + * SHARED ZK baseline (the default/aliasing path): with {@code controller.cluster.zk.address} == |
| 136 | + * {@code zookeeper.address}, BOTH clusters land on the single ZK and the second ZK stays empty of |
| 137 | + * Venice clusters. Guards against the split-ZK change regressing the common deployment. |
| 138 | + */ |
| 139 | + @Test(timeOut = TIMEOUT_MS) |
| 140 | + public void testSharedZkBaseline() { |
| 141 | + bringUpController(/* splitZk */ false); |
| 142 | + |
| 143 | + List<String> clustersOnStorageZk = listHelixClusters(storageZk.getAddress()); |
| 144 | + List<String> clustersOnControllerZk = listHelixClusters(controllerZk.getAddress()); |
| 145 | + |
| 146 | + assertTrue( |
| 147 | + clustersOnStorageZk.contains(controllerClusterName) && clustersOnStorageZk.contains(clusterName), |
| 148 | + "With shared ZK, both controller and storage clusters must live on the single ZK, found: " |
| 149 | + + clustersOnStorageZk); |
| 150 | + assertFalse( |
| 151 | + clustersOnControllerZk.contains(controllerClusterName) || clustersOnControllerZk.contains(clusterName), |
| 152 | + "The unused second ZK must not host any Venice cluster in the shared-ZK case, found: " |
| 153 | + + clustersOnControllerZk); |
| 154 | + } |
| 155 | + |
| 156 | + /** Builds and starts a real VeniceHelixAdmin (controller) wired to one or two ZK ensembles. */ |
| 157 | + private void bringUpController(boolean splitZk) { |
| 158 | + clusterName = Utils.getUniqueString("test-cluster"); |
| 159 | + |
| 160 | + Properties props = TestUtils.getPropertiesForControllerConfig(); |
| 161 | + props.put(ZOOKEEPER_ADDRESS, storageZk.getAddress()); |
| 162 | + if (splitZk) { |
| 163 | + props.put(CONTROLLER_CLUSTER_ZK_ADDRESSS, controllerZk.getAddress()); |
| 164 | + } |
| 165 | + props.put(CLUSTER_NAME, clusterName); |
| 166 | + props.put(KAFKA_REPLICATION_FACTOR, 1); |
| 167 | + props.put(KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerWrapper.getAddress()); |
| 168 | + props.put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 16); |
| 169 | + props.put(DEFAULT_PARTITION_SIZE, 10); |
| 170 | + props.put(CLUSTER_TO_D2, TestUtils.getClusterToD2String(Collections.singletonMap(clusterName, "dummy_d2"))); |
| 171 | + props.put( |
| 172 | + CLUSTER_TO_SERVER_D2, |
| 173 | + TestUtils.getClusterToD2String(Collections.singletonMap(clusterName, "dummy_server_d2"))); |
| 174 | + props.put(CONTROLLER_ADD_VERSION_VIA_ADMIN_PROTOCOL, true); |
| 175 | + props.put(ADMIN_HELIX_MESSAGING_CHANNEL_ENABLED, false); |
| 176 | + props.put(PARTICIPANT_MESSAGE_STORE_ENABLED, true); |
| 177 | + props.put(CONTROLLER_SYSTEM_SCHEMA_CLUSTER_NAME, clusterName); |
| 178 | + props.put(CONTROLLER_SSL_ENABLED, false); |
| 179 | + props.putAll(PubSubBrokerWrapper.getBrokerDetailsForClients(Collections.singletonList(pubSubBrokerWrapper))); |
| 180 | + |
| 181 | + VeniceControllerClusterConfig controllerConfig = new VeniceControllerClusterConfig(new VeniceProperties(props)); |
| 182 | + VeniceControllerMultiClusterConfig multiClusterConfig = |
| 183 | + TestUtils.getMultiClusterConfigFromOneCluster(controllerConfig); |
| 184 | + controllerClusterName = multiClusterConfig.getControllerClusterName(); |
| 185 | + |
| 186 | + veniceAdmin = new VeniceHelixAdmin( |
| 187 | + multiClusterConfig, |
| 188 | + new MetricsRepository(), |
| 189 | + D2TestUtils.getAndStartD2Client(storageZk.getAddress()), |
| 190 | + pubSubTopicRepository, |
| 191 | + pubSubBrokerWrapper.getPubSubClientsFactory(), |
| 192 | + pubSubBrokerWrapper.getPubSubPositionTypeRegistry(), |
| 193 | + Optional.empty(), |
| 194 | + Optional.empty(), |
| 195 | + Optional.empty()); |
| 196 | + veniceAdmin.initStorageCluster(clusterName); |
| 197 | + |
| 198 | + TestUtils.waitForNonDeterministicCompletion( |
| 199 | + TIMEOUT_MS, |
| 200 | + TimeUnit.MILLISECONDS, |
| 201 | + () -> veniceAdmin.isLeaderControllerFor(clusterName)); |
| 202 | + } |
| 203 | + |
| 204 | + /** ZkInspector-style probe: which Helix clusters physically exist on the given ZK ensemble. */ |
| 205 | + private static List<String> listHelixClusters(String zkAddress) { |
| 206 | + ZKHelixAdmin admin = new ZKHelixAdmin(zkAddress); |
| 207 | + try { |
| 208 | + return admin.getClusters(); |
| 209 | + } finally { |
| 210 | + admin.close(); |
| 211 | + } |
| 212 | + } |
| 213 | +} |
0 commit comments