Skip to content

Commit

Permalink
[dvc][server] Support separate inc push topic with different pubsub e…
Browse files Browse the repository at this point in the history
…ntries (#1262)

* [dvc][server] Support seperate inc push topic with different pubsub entries
  • Loading branch information
adamxchen authored Nov 2, 2024
1 parent 7e9aa79 commit 03c8302
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
Expand Down Expand Up @@ -149,6 +150,21 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
Map<String, String> tmpKafkaUrlResolution = new HashMap<>();

boolean foundBaseKafkaUrlInMappingIfItIsPopulated = kafkaClusterMap.isEmpty();
/**
* The cluster ID, alias and kafka URL mappings are defined in the service config file
* so in order to support multiple cluster id mappings we pass them as separated entries
* for example, we can build a new cluster id with its alias and url
* <entry key="2">
* <map>
* <entry key="name" value="region1_sep"/>
* <entry key="url" value="${venice.kafka.ssl.bootstrap.servers.region1}_sep"/>
* </map>
* </entry>
*
* For the separate incremental push topic feature, we duplicate entries with "_sep" suffix and different cluster id
* to support two RT topics (regular rt and incremental rt) with different cluster id.
*/

for (Map.Entry<String, Map<String, String>> kafkaCluster: kafkaClusterMap.entrySet()) {
int clusterId = Integer.parseInt(kafkaCluster.getKey());
Map<String, String> mappings = kafkaCluster.getValue();
Expand Down Expand Up @@ -207,11 +223,11 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
/**
* If the {@link kafkaClusterIdToUrlMap} and {@link kafkaClusterUrlToIdMap} are equal in size, then it means
* that {@link KAFKA_CLUSTER_MAP_KEY_OTHER_URLS} was never specified in the {@link kafkaClusterMap}, in which
* case, the resolver needs not lookup anything, and it will always return the same as its input.
* case, the resolver needs not lookup anything, and it will always return the same input with potentially filtering
*/
this.kafkaClusterUrlResolver = this.kafkaClusterIdToUrlMap.size() == this.kafkaClusterUrlToIdMap.size()
? String::toString
: url -> kafkaUrlResolution.getOrDefault(url, url);
? Utils::resolveKafkaUrlForSepTopic
: url -> Utils.resolveKafkaUrlForSepTopic(kafkaUrlResolution.getOrDefault(url, url));
this.kafkaBootstrapServers = this.kafkaClusterUrlResolver.apply(baseKafkaBootstrapServers);
if (this.kafkaBootstrapServers == null || this.kafkaBootstrapServers.isEmpty()) {
throw new ConfigurationException("kafkaBootstrapServers can't be empty");
Expand Down Expand Up @@ -364,4 +380,20 @@ public VeniceProperties getClusterProperties() {
public Map<String, Map<String, String>> getKafkaClusterMap() {
return kafkaClusterMap;
}

/**
* For the separate incremental push topic feature, we need to resolve the cluster id to the original one for monitoring
* purposes as the incremental push topic essentially uses the same pubsub clusters as the regular push topic, though
* it appears to have a different cluster id
* @param clusterId
* @return
*/
public int getEquivalentKafkaClusterIdForSepTopic(int clusterId) {
String alias = kafkaClusterIdToAliasMap.get(clusterId);
if (alias == null || !alias.endsWith(Utils.SEPARATE_TOPIC_SUFFIX)) {
return clusterId;
}
String originalAlias = alias.substring(0, alias.length() - Utils.SEPARATE_TOPIC_SUFFIX.length());
return kafkaClusterAliasToIdMap.getInt(originalAlias);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ protected static Runnable getStuckConsumerDetectionAndRepairRunnable(
AbstractKafkaConsumerService getKafkaConsumerService(final String kafkaURL) {
AbstractKafkaConsumerService consumerService = kafkaServerToConsumerServiceMap.get(kafkaURL);
if (consumerService == null && kafkaClusterUrlResolver != null) {
// The resolver is needed to resolve a special format of kafka URL to the original kafka URL
consumerService = kafkaServerToConsumerServiceMap.get(kafkaClusterUrlResolver.apply(kafkaURL));
}
return consumerService;
Expand Down Expand Up @@ -391,8 +392,8 @@ public ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, L
new StorePartitionDataReceiver(
storeIngestionTask,
pubSubTopicPartition,
kafkaURL,
kafkaClusterUrlToIdMap.getOrDefault(kafkaURL, -1));
kafkaURL, // do not resolve and let it pass downstream for offset tracking purpose
kafkaClusterUrlToIdMap.getOrDefault(kafkaURL, -1)); // same pubsub url but different id for sep topic

versionTopicStoreIngestionTaskMapping.put(storeIngestionTask.getVersionTopic().getName(), storeIngestionTask);
consumerService.startConsumptionIntoDataReceiver(partitionReplicaIngestionContext, lastOffset, dataReceiver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2238,20 +2238,24 @@ protected void recordHeartbeatReceived(
return;
}

// Separate incremental push pubsub entries has the same pubsub url but differnet cluster id, which creates
// confusion
// for heartbeat tracking. We need to resolve the kafka url to the actual kafka cluster url.
String resolvedKafkaUrl = kafkaClusterUrlResolver != null ? kafkaClusterUrlResolver.apply(kafkaUrl) : kafkaUrl;
if (partitionConsumptionState.getLeaderFollowerState().equals(LEADER)) {
heartbeatMonitoringService.recordLeaderHeartbeat(
storeName,
versionNumber,
partitionConsumptionState.getPartition(),
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl),
consumerRecord.getValue().producerMetadata.messageTimestamp,
partitionConsumptionState.isComplete());
} else {
heartbeatMonitoringService.recordFollowerHeartbeat(
storeName,
versionNumber,
partitionConsumptionState.getPartition(),
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl),
consumerRecord.getValue().producerMetadata.messageTimestamp,
partitionConsumptionState.isComplete());
}
Expand Down Expand Up @@ -2421,12 +2425,14 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(

// If we are here the message must be produced to local kafka or silently consumed.
LeaderProducedRecordContext leaderProducedRecordContext;

// No need to resolve cluster id and kafka url because sep topics are real time topic and it's not VT
validateRecordBeforeProducingToLocalKafka(consumerRecord, partitionConsumptionState, kafkaUrl, kafkaClusterId);

if (consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) {
recordRegionHybridConsumptionStats(
kafkaClusterId,
// convert the cluster id back to the original cluster id for monitoring purpose
serverConfig.getEquivalentKafkaClusterIdForSepTopic(
serverConfig.getEquivalentKafkaClusterIdForSepTopic(kafkaClusterId)),
consumerRecord.getPayloadSize(),
consumerRecord.getOffset(),
beforeProcessingBatchRecordsTimestampMs);
Expand All @@ -2449,7 +2455,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
switch (controlMessageType) {
case START_OF_PUSH:
/**
* N.B.: This is expected to be the first time time we call {@link veniceWriter#get()}. During this time
* N.B.: This is expected to be the first time we call {@link veniceWriter#get()}. During this time
* since startOfPush has not been processed yet, {@link StoreVersionState} is not created yet (unless
* this is a server restart scenario). So the {@link com.linkedin.venice.writer.VeniceWriter#isChunkingEnabled} field
* will not be set correctly at this point. This is fine as chunking is mostly not applicable for control messages.
Expand Down Expand Up @@ -2759,6 +2765,8 @@ private void validateRecordBeforeProducingToLocalKafka(
int kafkaClusterId) {
// Check whether the message is from local version topic; leader shouldn't consume from local VT and then produce
// back to VT again
// localKafkaClusterId will always be the regular one without "_sep" suffix so kafkaClusterId should be converted
// for comparison. Like-wise for the kafkaUrl.
if (kafkaClusterId == localKafkaClusterId
&& consumerRecord.getTopicPartition().getPubSubTopic().equals(this.versionTopic)
&& kafkaUrl.equals(this.localKafkaServer)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1872,6 +1872,8 @@ protected TopicSwitch resolveSourceKafkaServersWithinTopicSwitch(TopicSwitch ori
}
List<CharSequence> returnSet = new ArrayList<>(originalTopicSwitch.sourceKafkaServers.size());
for (CharSequence url: originalTopicSwitch.sourceKafkaServers) {
// For separate incremental topic URL, the original URL is not a valid URL, so need to resolve it.
// There's no issue for TS, as we do topic switch for both real-time and separate incremental topic.
returnSet.add(kafkaClusterUrlResolver.apply(url.toString()));
}
originalTopicSwitch.sourceKafkaServers = returnSet;
Expand Down Expand Up @@ -3423,12 +3425,16 @@ public void consumerBatchUnsubscribe(Set<PubSubTopicPartition> topicPartitionSet
public abstract void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState);

public void consumerSubscribe(PubSubTopicPartition pubSubTopicPartition, long startOffset, String kafkaURL) {
final boolean consumeRemotely = !Objects.equals(kafkaURL, localKafkaServer);
// TODO: pass special format of kafka url as input here when subscribe to the separate incremental push topic
String resolvedKafkaURL = kafkaClusterUrlResolver != null ? kafkaClusterUrlResolver.apply(kafkaURL) : kafkaURL;
final boolean consumeRemotely = !Objects.equals(resolvedKafkaURL, localKafkaServer);
// TODO: Move remote KafkaConsumerService creating operations into the aggKafkaConsumerService.
aggKafkaConsumerService
.createKafkaConsumerService(createKafkaConsumerProperties(kafkaProps, kafkaURL, consumeRemotely));
.createKafkaConsumerService(createKafkaConsumerProperties(kafkaProps, resolvedKafkaURL, consumeRemotely));
PartitionReplicaIngestionContext partitionReplicaIngestionContext =
new PartitionReplicaIngestionContext(versionTopic, pubSubTopicPartition, versionRole, workloadType);
// localKafkaServer doesn't have suffix but kafkaURL may have suffix,
// and we don't want to pass the resolvedKafkaURL as it will be passed to data receiver for parsing cluster id
aggKafkaConsumerService.subscribeConsumerFor(kafkaURL, this, partitionReplicaIngestionContext, startOffset);
}

Expand Down Expand Up @@ -4126,6 +4132,9 @@ public void updateOffsetMetadataAndSync(String topic, int partitionId) {
* @return topic manager
*/
protected TopicManager getTopicManager(String sourceKafkaServer) {
if (kafkaClusterUrlResolver != null) {
sourceKafkaServer = kafkaClusterUrlResolver.apply(sourceKafkaServer);
}
if (sourceKafkaServer.equals(localKafkaServer)) {
// Use default kafka admin client (could be scala or java based) to get local topic manager
return topicManagerRepository.getLocalTopicManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,9 @@ public String toString() {
return this.getClass().getSimpleName() + "{" + "VT=" + storeIngestionTask.getVersionTopic() + ", topicPartition="
+ topicPartition + '}';
}

// for testing purpose only
int getKafkaClusterId() {
return this.kafkaClusterId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.davinci.config;

import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;

import com.linkedin.venice.utils.VeniceProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class VeniceClusterConfigTest {
VeniceClusterConfig config;
static Map<String, Map<String, String>> KAFKA_CLUSTER_MAP;
static {
KAFKA_CLUSTER_MAP = new HashMap<>();
for (int i = 0; i < 3; i++) {
Map<String, String> entry = new HashMap<>();
entry.put("name", "region_" + i);
entry.put("url", "localhost:" + i);
KAFKA_CLUSTER_MAP.put(String.valueOf(i), entry);
}

// additional mapping for sep topics
for (int i = 3; i < 6; i++) {
int clusterId = i % 3;
Map<String, String> entry = new HashMap<>();
entry.put("name", "region_" + clusterId + "_sep");
entry.put("url", "localhost:" + clusterId + "_sep");
KAFKA_CLUSTER_MAP.put(String.valueOf(i), entry);
}
}

@BeforeMethod
public void setUp() {
Properties props = new Properties();
props.setProperty(CLUSTER_NAME, "test_cluster");
props.setProperty(ZOOKEEPER_ADDRESS, "fake_zk_addr");
props.setProperty(KAFKA_BOOTSTRAP_SERVERS, "fake_kafka_addr");
config = new VeniceClusterConfig(new VeniceProperties(props), KAFKA_CLUSTER_MAP);
}

@Test
public void testGetEquivalentKafkaClusterIdForSepTopic() {
Assert.assertEquals(config.getEquivalentKafkaClusterIdForSepTopic(0), 0);
Assert.assertEquals(config.getEquivalentKafkaClusterIdForSepTopic(1), 1);
Assert.assertEquals(config.getEquivalentKafkaClusterIdForSepTopic(2), 2);
Assert.assertEquals(config.getEquivalentKafkaClusterIdForSepTopic(3), 0);
Assert.assertEquals(config.getEquivalentKafkaClusterIdForSepTopic(4), 1);
Assert.assertEquals(config.getEquivalentKafkaClusterIdForSepTopic(5), 2);
}
}
Loading

0 comments on commit 03c8302

Please sign in to comment.