Skip to content

Commit

Permalink
[admin-tool] Metadata migration from source ZK to destination ZK (#1056)
Browse files Browse the repository at this point in the history
* Implemented metadata migration from src zk to dest zk

* Refactored test cases and resolved spotbugs

* Added javadoc to ZkCopier class and moved cluster zk paths field to constants file

* Implemented comments and refactored getVenicePaths() that takes in a list or a tree

* Modified AdminTool command for metadata migration

- Implemented optional arguments to build ZK clients with ZK SSL config files
- Implemented more validation checks to copy over existing ZNode paths
- Refactored test cases

* Successful testing of metadata migration with ei zk server

- fixed read and write data issues in ZkClient with help from Kai+Nisarg
- removed unused test case

* Added note to migrateVenicePaths() to ensure that destination zk is new or doesn't contain metadata that needs to be migrated

* Addressed comments

* Reverted to previous iteration with minor changes

- Getting Venice-specific metadata from ZK client into a tree data structure
- Then, used pathsTreeToList() to convert tree to list
- Made source and destination ZK client SSL config files required arguments for metadata migration AdminTool command
  • Loading branch information
bonytoni authored Jul 23, 2024
1 parent 22a3b29 commit 79a13ba
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ public static void main(String[] args) throws Exception {
case DUMP_TOPIC_PARTITION_INGESTION_CONTEXT:
dumpTopicPartitionIngestionContext(cmd);
break;
case MIGRATE_VENICE_ZK_PATHS:
migrateVeniceZKPaths(cmd);
break;
case EXTRACT_VENICE_ZK_PATHS:
extractVeniceZKPaths(cmd);
break;
Expand Down Expand Up @@ -3094,6 +3097,22 @@ private static void dumpTopicPartitionIngestionContext(CommandLine cmd) throws E
}
}

private static void migrateVeniceZKPaths(CommandLine cmd) throws Exception {
Set<String> clusterNames = Utils.parseCommaSeparatedStringToSet(getRequiredArgument(cmd, Arg.CLUSTER_LIST));
String srcZKUrl = getRequiredArgument(cmd, Arg.SRC_ZOOKEEPER_URL);
String srcZKSSLConfigs = getRequiredArgument(cmd, Arg.SRC_ZK_SSL_CONFIG_FILE);
String destZKUrl = getRequiredArgument(cmd, Arg.DEST_ZOOKEEPER_URL);
String destZKSSLConfigs = getRequiredArgument(cmd, Arg.DEST_ZK_SSL_CONFIG_FILE);
ZkClient srcZkClient = readZKConfigAndBuildZKClient(srcZKUrl, srcZKSSLConfigs);
ZkClient destZkClient = readZKConfigAndBuildZKClient(destZKUrl, destZKSSLConfigs);
try {
ZkCopier.migrateVenicePaths(srcZkClient, destZkClient, clusterNames, getRequiredArgument(cmd, Arg.BASE_PATH));
} finally {
srcZkClient.close();
destZkClient.close();
}
}

private static void extractVeniceZKPaths(CommandLine cmd) {
Set<String> clusterNames = Utils.parseCommaSeparatedStringToSet(getRequiredArgument(cmd, Arg.CLUSTER_LIST));
ZkCopier.extractVenicePaths(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public enum Arg {
URL("url", "u", true, "Venice url, eg. http://localhost:1689 This can be a router or a controller"),
SERVER_URL("server-url", "su", true, "Venice server url, eg. http://localhost:1690 This has to be a storage node"),
VENICE_ZOOKEEPER_URL("venice-zookeeper-url", "vzu", true, "Venice Zookeeper url, eg. localhost:2622"),
SRC_ZOOKEEPER_URL("src-zookeeper-url", "szu", true, "Source Zookeeper url, eg. localhost:2181"),
DEST_ZOOKEEPER_URL("dest-zookeeper-url", "dzu", true, "Destination Zookeeper url, eg. localhost:2182"),
INFILE("infile", "if", true, "Path to input text file"), OUTFILE("outfile", "of", true, "Path to output text file"),
BASE_PATH("base-path", "bp", true, "Base path for ZK, eg. /venice-parent"),
CLUSTER("cluster", "c", true, "Name of Venice cluster"),
Expand All @@ -27,7 +29,10 @@ public enum Arg {
VALUE_SCHEMA_ID("value-schema-id", "vid", true, "value schema id"),
VALUE_SCHEMA("value-schema-file", "vs", true, "Path to text file with value schema"),
ZK_SSL_CONFIG_FILE("zk-ssl-config-file", "zscf", true, "Path to text file with ZK SSL configs"),
DERIVED_SCHEMA_ID("derived-schema-id", "did", true, "derived schema id"),
SRC_ZK_SSL_CONFIG_FILE("src-zk-ssl-config-file", "szscf", true, "Path to text file with source ZK SSL configs"),
DEST_ZK_SSL_CONFIG_FILE(
"dest-zk-ssl-config-file", "dzscf", true, "Path to text file with destination ZK SSL configs"
), DERIVED_SCHEMA_ID("derived-schema-id", "did", true, "derived schema id"),
DERIVED_SCHEMA("derived-schema-file", "ds", true, "Path to text file with derived schema"),
OWNER("owner", "o", true, "Owner email for new store creation"),
STORAGE_NODE("storage-node", "n", true, "Helix instance ID for a storage node, eg. lva1-app1234_1690"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static com.linkedin.venice.Arg.DERIVED_SCHEMA;
import static com.linkedin.venice.Arg.DERIVED_SCHEMA_ID;
import static com.linkedin.venice.Arg.DEST_FABRIC;
import static com.linkedin.venice.Arg.DEST_ZK_SSL_CONFIG_FILE;
import static com.linkedin.venice.Arg.DEST_ZOOKEEPER_URL;
import static com.linkedin.venice.Arg.DISABLE_DAVINCI_PUSH_STATUS_STORE;
import static com.linkedin.venice.Arg.DISABLE_META_STORE;
import static com.linkedin.venice.Arg.ENABLE_DISABLED_REPLICA;
Expand Down Expand Up @@ -101,6 +103,8 @@
import static com.linkedin.venice.Arg.SKIP_DIV;
import static com.linkedin.venice.Arg.SKIP_LAST_STORE_CREATION;
import static com.linkedin.venice.Arg.SOURCE_FABRIC;
import static com.linkedin.venice.Arg.SRC_ZK_SSL_CONFIG_FILE;
import static com.linkedin.venice.Arg.SRC_ZOOKEEPER_URL;
import static com.linkedin.venice.Arg.STARTING_OFFSET;
import static com.linkedin.venice.Arg.START_DATE;
import static com.linkedin.venice.Arg.STORAGE_NODE;
Expand Down Expand Up @@ -535,6 +539,11 @@ public enum Command {
"backup-store-metadata-from-graveyard", "Backup store metadata from graveyard in EI",
new Arg[] { VENICE_ZOOKEEPER_URL, ZK_SSL_CONFIG_FILE, BACKUP_FOLDER }
),
MIGRATE_VENICE_ZK_PATHS(
"migrate-venice-zk-paths", "Migrate Venice-specific metadata from a source ZK to a destination ZK",
new Arg[] { SRC_ZOOKEEPER_URL, SRC_ZK_SSL_CONFIG_FILE, DEST_ZOOKEEPER_URL, DEST_ZK_SSL_CONFIG_FILE, CLUSTER_LIST,
BASE_PATH }
),
EXTRACT_VENICE_ZK_PATHS(
"extract-venice-zk-paths",
"Extract Venice-specific paths from a ZK snapshot input text file to an output text file",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,72 @@
package com.linkedin.venice;

import static com.linkedin.venice.zk.VeniceZkPaths.ADMIN_TOPIC_METADATA;
import static com.linkedin.venice.zk.VeniceZkPaths.EXECUTION_IDS;
import static com.linkedin.venice.zk.VeniceZkPaths.PARENT_OFFLINE_PUSHES;
import static com.linkedin.venice.zk.VeniceZkPaths.ROUTERS;
import static com.linkedin.venice.zk.VeniceZkPaths.STORES;
import static com.linkedin.venice.zk.VeniceZkPaths.CLUSTER_ZK_PATHS;
import static com.linkedin.venice.zk.VeniceZkPaths.STORE_CONFIGS;
import static com.linkedin.venice.zk.VeniceZkPaths.STORE_GRAVEYARD;

import com.linkedin.venice.exceptions.VeniceException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.CreateMode;


/**
* <p>
* This class contains methods to 1)migrate Venice-specific metadata from a source ZooKeeper (ZK) to a destination ZK
* and 2)extract Venice-specific paths from an input text file containing ZK paths to an output text file.
* We implement a tree data structure to represent ZK paths as nested children and use it to filter out Venice-specific
* paths efficiently.
* </p>
*/
public class ZkCopier {
private static final Set<String> CLUSTER_ZK_PATHS = new HashSet<>(
Arrays.asList(ADMIN_TOPIC_METADATA, EXECUTION_IDS, PARENT_OFFLINE_PUSHES, ROUTERS, STORE_GRAVEYARD, STORES));
private static final Logger LOGGER = LogManager.getLogger(ZkCopier.class);

/**
* Migrate Venice-specific metadata from a source ZK to a destination ZK.
* @param srcZkClient source ZK client
* @param destZkClient destination ZK client
* @param clusterNames set of cluster names
* @param basePath base path for ZK
* @Note: {@code destZkClient} should be a fresh ZK server or must not contain any Venice-specific metadata that needs to be migrated,
* otherwise a {@code ZkNodeExistsException} will be thrown
*/
public static void migrateVenicePaths(
ZkClient srcZkClient,
ZkClient destZkClient,
Set<String> clusterNames,
String basePath) {
if (!basePath.startsWith("/")) {
throw new VeniceException("Base path must start with a forward slash (/)");
}
if (!srcZkClient.exists(basePath)) {
throw new VeniceException("Base path does not exist in source ZK");
}
TreeNode srcZkPathsTree = getZkClientPathsTree(srcZkClient, clusterNames, basePath);
List<String> destZkPathsList = pathsTreeToList(srcZkPathsTree);
srcZkClient.setZkSerializer(new ByteArraySerializer());
destZkClient.setZkSerializer(new ByteArraySerializer());
LOGGER.info("Starting Migration");
for (String path: destZkPathsList) {
try {
destZkClient.create(path, srcZkClient.readData(path), CreateMode.PERSISTENT);
} catch (Exception e) {
LOGGER.error("Failed to create path: " + path, e);
throw new VeniceException(e);
}
}
LOGGER.info("Ending Migration");
}

/**
* Extract Venice-specific paths from an input text file to an output text file.
Expand All @@ -48,36 +89,45 @@ public static void extractVenicePaths(
List<String> zkPaths = FileUtils.readLines(inputFile);

// write Venice-specific paths to output file
List<String> venicePaths = getVenicePaths(zkPaths, clusterNames, basePath);
List<String> venicePaths = getVenicePathsFromList(zkPaths, clusterNames, basePath);
File outputFile = new File(outputPath);
FileUtils.writeLines(outputFile, venicePaths);
} catch (IOException e) {
throw new VeniceException(e.getMessage());
throw new VeniceException(e);
}
}

/**
* Get Venice-specific paths from a list of ZK paths filtered by 1)base path, 2)cluster names, and 3)required cluster ZK paths.
* Get Venice-specific paths from a list of ZK paths.
* @return a list of Venice-specific paths filtered from {@code zkPaths}
* @Note: Converts the list {@code zkPaths} to a tree and calls {@code getVenicePathsFromTree()}
*/
static List<String> getVenicePaths(List<String> zkPaths, Set<String> clusterNames, String basePath) {
TreeNode requiredPathsTreeRoot = buildRequiredPathsTree(clusterNames, basePath);
static List<String> getVenicePathsFromList(List<String> zkPaths, Set<String> clusterNames, String basePath) {
TreeNode zkPathsTreeRoot = pathsListToTree(zkPaths, basePath);
return getVenicePathsFromTree(zkPathsTreeRoot, clusterNames, basePath);
}

/**
* Get Venice-specific paths from a tree of ZK paths filtered by 1)base path, 2)cluster names, and 3)required cluster ZK paths.
* @return a list of Venice-specific paths filtered from {@code zkPathsTreeRoot}
*/
static List<String> getVenicePathsFromTree(TreeNode zkPathsTreeRoot, Set<String> clusterNames, String basePath) {
TreeNode requiredPathsTreeRoot = buildRequiredPathsTree(clusterNames, basePath);
if (!zkPathsTreeRoot.getName().equals(requiredPathsTreeRoot.getName())) {
throw new VeniceException(
"Base path mismatch: " + zkPathsTreeRoot.getName() + " != " + requiredPathsTreeRoot.getName());
}
getVenicePathsHelper(zkPathsTreeRoot, requiredPathsTreeRoot);
getVenicePathsFromTreeHelper(zkPathsTreeRoot, requiredPathsTreeRoot);
return pathsTreeToList(zkPathsTreeRoot);
}

/**
* Recursively match children nodes of {@code zkPathsTreeNode} and {@code requiredPathsTreeNode} and removes unmatched children nodes from {@code zkPathsTreeNode}.
* @param zkPathsTreeNode node in the ZK paths tree
* @param requiredPathsTreeNode node in the required paths tree
* @Note: This method directly modifies {@code zkPathsTreeNode} in the parent method {@code getVenicePaths()}
* @Note: This method directly modifies {@code zkPathsTreeNode} in the parent method {@code getVenicePathsFromTree()}
*/
private static void getVenicePathsHelper(TreeNode zkPathsTreeNode, TreeNode requiredPathsTreeNode) {
private static void getVenicePathsFromTreeHelper(TreeNode zkPathsTreeNode, TreeNode requiredPathsTreeNode) {
if (requiredPathsTreeNode.getChildren().isEmpty() || zkPathsTreeNode.getChildren().isEmpty()) {
return;
}
Expand All @@ -87,7 +137,7 @@ private static void getVenicePathsHelper(TreeNode zkPathsTreeNode, TreeNode requ
String zkChildName = zkEntry.getKey();
TreeNode zkChild = zkEntry.getValue();
if (requiredPathsTreeNode.containsChild(zkChildName)) {
getVenicePathsHelper(zkChild, requiredPathsTreeNode.getChildren().get(zkChildName));
getVenicePathsFromTreeHelper(zkChild, requiredPathsTreeNode.getChildren().get(zkChildName));
} else {
iterator.remove();
}
Expand All @@ -110,6 +160,47 @@ static TreeNode buildRequiredPathsTree(Set<String> clusterNames, String basePath
return root;
}

/**
* Build a tree of ZK paths with {@code basePath} and {@code clusterNames} by fetching children of each cluster path using the ZK client.
* @return the root (base path) of the tree
*/
static TreeNode getZkClientPathsTree(ZkClient zkClient, Set<String> clusterNames, String basePath) {
TreeNode root = new TreeNode(basePath);
List<String> basePathChildren = zkClient.getChildren(basePath);
if (basePathChildren.contains(STORE_CONFIGS)) {
TreeNode storeConfigsNode = root.addChild(STORE_CONFIGS);
String storeConfigsPath = basePath + "/" + STORE_CONFIGS;
addChildrenToTreeNode(zkClient, storeConfigsNode, storeConfigsPath);
}
for (String cluster: clusterNames) {
if (basePathChildren.contains(cluster)) {
TreeNode clusterNode = root.addChild(cluster);
String clusterPath = basePath + "/" + cluster;
List<String> clusterChildren = zkClient.getChildren(clusterPath);
for (String venicePath: CLUSTER_ZK_PATHS) {
if (clusterChildren.contains(venicePath)) {
TreeNode venicePathNode = clusterNode.addChild(venicePath);
String clusterVenicePath = clusterPath + "/" + venicePath;
addChildrenToTreeNode(zkClient, venicePathNode, clusterVenicePath);
}
}
}
}
return root;
}

/**
* Recursively add children to a {@code TreeNode} by fetching children of a ZK path using the ZK client.
* @Note: This method directly modifies the tree {@code node} in the parent method {@code getZkClientPathsTree()}
*/
private static void addChildrenToTreeNode(ZkClient zkClient, TreeNode node, String path) {
List<String> children = zkClient.getChildren(path);
for (String child: children) {
TreeNode childNode = node.addChild(child);
addChildrenToTreeNode(zkClient, childNode, path + "/" + child);
}
}

/**
* Convert a list of paths to a tree of paths as nested children.
* @return the root of the tree
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public void testExtractVenicePaths() throws Exception {
}

@Test
public void testGetVenicePaths() {
public void testGetVenicePathsFromList() {
List<String> zkPaths = getPaths();
List<String> venicePaths = ZkCopier.getVenicePaths(zkPaths, CLUSTERS, BASE_PATH);
List<String> venicePaths = ZkCopier.getVenicePathsFromList(zkPaths, CLUSTERS, BASE_PATH);
Assert.assertEquals(venicePaths.size(), 22);
Assert.assertFalse(venicePaths.contains("/venice"));
Assert.assertFalse(venicePaths.contains("/venice/storeConfigs"));
Expand All @@ -65,35 +65,27 @@ public void testGetVenicePaths() {
Assert.assertFalse(venicePaths.contains("/venice/cluster1/adminTopicMetadata"));
Assert.assertFalse(venicePaths.contains("/venice/cluster1/adminTopicMetadata/file1"));
Assert.assertFalse(venicePaths.contains("/venice/cluster1/adminTopicMetadata/file2/file3"));
Assert.assertTrue(venicePaths.contains("/venice-parent"));
Assert.assertTrue(venicePaths.contains("/venice-parent/storeConfigs"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1"));
Assert.assertFalse(venicePaths.contains("/venice-parent/cluster1/storeConfigs"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/adminTopicMetadata"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/adminTopicMetadata/file1"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/adminTopicMetadata/file2"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/adminTopicMetadata/file2/file3"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/executionids"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/ParentOfflinePushes"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/routers"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/StoreGraveyard"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/Stores"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2"));
Assert.assertFalse(venicePaths.contains("/venice-parent/cluster2/storeConfigs"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/adminTopicMetadata"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/executionids"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/executionids/file1"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/executionids/file2"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/executionids/file2/file3"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/ParentOfflinePushes"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/routers"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/StoreGraveyard"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/Stores"));
Assert.assertFalse(venicePaths.contains("/venice-parent/helix-cluster"));
Assert.assertFalse(venicePaths.contains("/venice-parent/helix-cluster/storeConfigs"));
Assert.assertFalse(venicePaths.contains("/venice-parent/helix-cluster/adminTopicMetadata"));
Assert.assertFalse(venicePaths.contains("/venice-parent/helix-cluster/adminTopicMetadata/file1"));
Assert.assertFalse(venicePaths.contains("/venice-parent/helix-cluster/adminTopicMetadata/file2/file3"));
testVenicePathsContainsAsserts(venicePaths);
}

@Test
public void testGetVenicePathsFromTree() {
TreeNode root = ZkCopier.buildRequiredPathsTree(CLUSTERS, BASE_PATH);
List<String> venicePaths = ZkCopier.getVenicePathsFromTree(root, CLUSTERS, BASE_PATH);
testVenicePathsContainsAsserts(venicePaths);
}

@Test
Expand All @@ -120,6 +112,25 @@ public void testBuildRequiredPathsTree() {
}
}

private void testVenicePathsContainsAsserts(List<String> venicePaths) {
Assert.assertTrue(venicePaths.contains("/venice-parent"));
Assert.assertTrue(venicePaths.contains("/venice-parent/storeConfigs"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/adminTopicMetadata"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/executionids"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/ParentOfflinePushes"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/routers"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/StoreGraveyard"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster1/Stores"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/adminTopicMetadata"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/executionids"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/ParentOfflinePushes"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/routers"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/StoreGraveyard"));
Assert.assertTrue(venicePaths.contains("/venice-parent/cluster2/Stores"));
}

private void testContainsChildAsserts(TreeNode child) {
Assert.assertTrue(child.containsChild(ADMIN_TOPIC_METADATA));
Assert.assertTrue(child.containsChild(EXECUTION_IDS));
Expand Down
Loading

0 comments on commit 79a13ba

Please sign in to comment.