Skip to content

Commit

Permalink
[server] Pass ingestion role change for store version from StoreInges…
Browse files Browse the repository at this point in the history
…tionTask to AggKafkaConsumerService. (#1034)

This PR introduced a new class TopicPartitionReplicaRole to bundle information about particular topic partition:

1. Version role: future/current/backup
2. Workload type: Active Active or Write Compute due to its nature of heavy processing needed.

And a new config SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED to turn on/off the resubscription.

StoreIngestionTask thread will periodically check the store's version role changed and store version's workload type changed. If changed, it will trigger resubscription by graceful un-subscription/ re-subscription for all partitions to let bottom AggKafkaConsumerService to find corresponding consumer thread pool for them. In the future KafkaConsumerService aware of store version role during subscription, there will be more information with TopicPartitionReplicaRole passed with aggKafkaConsumerService#subscribe.

In the future, we could also bundle more information to this object to pass into KafkaConsumerService to let it do better consumer pool allocation. With proper allocation of consumer thread pool, we could achieve prioritization on certain type of traffic (e.g. we could allocate larger consumer thread for current hybrid leader ingestion).

A unit test was added to verify the subscription/subscription happened for local and remote ingestion, by intentionally triggering version role change during ingestion.

Co-authored-by: Hao Xu <[email protected]>
  • Loading branch information
haoxu07 and Hao Xu authored Aug 2, 2024
1 parent 0f250df commit 1cc259a
Show file tree
Hide file tree
Showing 18 changed files with 500 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_INGESTION_REPAIR_SLEEP_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_REST_SERVICE_EPOLL_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_REST_SERVICE_STORAGE_THREAD_NUM;
import static com.linkedin.venice.ConfigKeys.SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_ROCKSDB_STORAGE_CONFIG_CHECK_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_ROUTER_CONNECTION_WARMING_DELAY_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_SCHEMA_FAST_CLASS_WARMUP_TIMEOUT;
Expand Down Expand Up @@ -478,6 +479,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean daVinciCurrentVersionBootstrappingSpeedupEnabled;
private final long daVinciCurrentVersionBootstrappingQuotaRecordsPerSecond;
private final long daVinciCurrentVersionBootstrappingQuotaBytesPerSecond;
private final boolean resubscriptionTriggeredByVersionIngestionContextChangeEnabled;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand Down Expand Up @@ -790,6 +792,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getLong(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_RECORDS_PER_SECOND, -1);
daVinciCurrentVersionBootstrappingQuotaBytesPerSecond =
serverProperties.getSizeInBytes(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND, -1);
resubscriptionTriggeredByVersionIngestionContextChangeEnabled =
serverProperties.getBoolean(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, false);
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1415,4 +1419,8 @@ public long getDaVinciCurrentVersionBootstrappingQuotaRecordsPerSecond() {
public long getDaVinciCurrentVersionBootstrappingQuotaBytesPerSecond() {
return daVinciCurrentVersionBootstrappingQuotaBytesPerSecond;
}

public boolean isResubscriptionTriggeredByVersionIngestionContextChangeEnabled() {
return resubscriptionTriggeredByVersionIngestionContextChangeEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public abstract SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition(
public abstract long getMaxElapsedTimeMSSinceLastPollInConsumerPool();

public abstract void startConsumptionIntoDataReceiver(
PubSubTopicPartition topicPartition,
PartitionReplicaIngestionContext partitionReplicaIngestionContext,
long lastReadOffset,
ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver);

Expand All @@ -43,4 +43,5 @@ public abstract long getLatestOffsetBasedOnMetrics(
public abstract Map<PubSubTopicPartition, TopicPartitionIngestionInfo> getIngestionInfoFromConsumer(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition);

}
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,10 @@ void batchUnsubscribeConsumerFor(PubSubTopic versionTopic, Set<PubSubTopicPartit
public ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> subscribeConsumerFor(
final String kafkaURL,
StoreIngestionTask storeIngestionTask,
PubSubTopicPartition pubSubTopicPartition,
PartitionReplicaIngestionContext partitionReplicaIngestionContext,
long lastOffset) {
PubSubTopic versionTopic = storeIngestionTask.getVersionTopic();
PubSubTopicPartition pubSubTopicPartition = partitionReplicaIngestionContext.getPubSubTopicPartition();
AbstractKafkaConsumerService consumerService = getKafkaConsumerService(kafkaURL);
if (consumerService == null) {
throw new VeniceException(
Expand All @@ -393,7 +394,7 @@ public ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, L
kafkaClusterUrlToIdMap.getOrDefault(kafkaURL, -1));

versionTopicStoreIngestionTaskMapping.put(storeIngestionTask.getVersionTopic().getName(), storeIngestionTask);
consumerService.startConsumptionIntoDataReceiver(pubSubTopicPartition, lastOffset, dataReceiver);
consumerService.startConsumptionIntoDataReceiver(partitionReplicaIngestionContext, lastOffset, dataReceiver);
TopicManager topicManager = storeIngestionTask.getTopicManager(kafkaURL);

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
* c) {@link ConsumerSubscriptionCleaner}
* 2. Receive various calls to interrogate or mutate consumer state, and delegate them to the correct unit, by
* maintaining a mapping of which unit belongs to which version-topic and subscribed topic-partition. Notably,
* the {@link #startConsumptionIntoDataReceiver(PubSubTopicPartition, long, ConsumedDataReceiver)} function allows the
* the {@link #startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, long, ConsumedDataReceiver)} function allows the
* caller to start funneling consumed data into a receiver (i.e. into another task).
* 3. Provide a single abstract function that must be overridden by subclasses in order to implement a consumption
* load balancing strategy: {@link #pickConsumerForPartition(PubSubTopic, PubSubTopicPartition)}
Expand Down Expand Up @@ -367,10 +367,11 @@ public long getMaxElapsedTimeMSSinceLastPollInConsumerPool() {

@Override
public void startConsumptionIntoDataReceiver(
PubSubTopicPartition topicPartition,
PartitionReplicaIngestionContext partitionReplicaIngestionContext,
long lastReadOffset,
ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver) {
PubSubTopic versionTopic = consumedDataReceiver.destinationIdentifier();
PubSubTopicPartition topicPartition = partitionReplicaIngestionContext.getPubSubTopicPartition();
SharedKafkaConsumer consumer = assignConsumerFor(versionTopic, topicPartition);

if (consumer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,13 @@ public long getMaxElapsedTimeMSSinceLastPollInConsumerPool() {

@Override
public void startConsumptionIntoDataReceiver(
PubSubTopicPartition topicPartition,
PartitionReplicaIngestionContext partitionReplicaIngestionContext,
long lastReadOffset,
ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver) {
PubSubTopic versionTopic = consumedDataReceiver.destinationIdentifier();
getKafkaConsumerService(versionTopic, topicPartition)
.startConsumptionIntoDataReceiver(topicPartition, lastReadOffset, consumedDataReceiver);
PubSubTopicPartition pubSubTopicPartition = partitionReplicaIngestionContext.getPubSubTopicPartition();
getKafkaConsumerService(versionTopic, pubSubTopicPartition)
.startConsumptionIntoDataReceiver(partitionReplicaIngestionContext, lastReadOffset, consumedDataReceiver);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements
private final AggKafkaConsumerService aggKafkaConsumerService;

/**
* A repository mapping each Kafka Topic to it corresponding Ingestion task responsible
* A repository mapping each Version Topic to it corresponding Ingestion task responsible
* for consuming messages and making changes to the local store accordingly.
*/
private final NavigableMap<String, StoreIngestionTask> topicNameToIngestionTaskMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,10 @@ protected void checkLongRunningTaskState() throws InterruptedException {
*/
partitionConsumptionState.setSkipKafkaMessage(false);
// Subscribe to local Kafka topic
PubSubTopicPartition pubSubTopicPartition =
partitionConsumptionState.getSourceTopicPartition(currentLeaderTopic);
consumerSubscribe(
partitionConsumptionState.getSourceTopicPartition(currentLeaderTopic),
pubSubTopicPartition,
partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(),
localKafkaServer);
}
Expand Down Expand Up @@ -1005,7 +1007,7 @@ protected Set<String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionStat
* note that when the function returns, new messages can be appended to the partition already, so it's not guaranteed
* that this timestamp is from the last message.
*
* See {@link PartitionConsumptionState#latestMessageConsumedTimestampInMs} for details.
* See {@link PartitionConsumptionState#getLatestMessageConsumedTimestampInMs} for details.
*/
private long getLastConsumedMessageTimestamp(int partition) {
// Consumption thread would update the last consumed message timestamp for the corresponding partition.
Expand Down Expand Up @@ -1140,6 +1142,7 @@ protected boolean processTopicSwitch(
int partition,
long offset,
PartitionConsumptionState partitionConsumptionState) {

TopicSwitch topicSwitch = (TopicSwitch) controlMessage.controlMessageUnion;
/**
* Currently just check whether the sourceKafkaServers list inside TopicSwitch control message only contains
Expand Down Expand Up @@ -3505,6 +3508,61 @@ protected Set<String> maybeSendIngestionHeartbeat() {
return failedPartitions;
}

/**
* Resubscribe operation by passing new version role and partition role to {@link AggKafkaConsumerService}. The action
* for leader and follower replica will be handled differently.
*/
@Override
protected void resubscribe(PartitionConsumptionState partitionConsumptionState) throws InterruptedException {
if (isLeader(partitionConsumptionState)) {
resubscribeAsLeader(partitionConsumptionState);
} else {
resubscribeAsFollower(partitionConsumptionState);
}
}

protected void resubscribeAsFollower(PartitionConsumptionState partitionConsumptionState)
throws InterruptedException {
int partition = partitionConsumptionState.getPartition();
consumerUnSubscribe(versionTopic, partitionConsumptionState);
waitForAllMessageToBeProcessedFromTopicPartition(
new PubSubTopicPartitionImpl(versionTopic, partition),
partitionConsumptionState);
LOGGER.info(
"Follower replica: {} unsubscribe finished for future resubscribe.",
partitionConsumptionState.getReplicaId());
PubSubTopicPartition followerTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition);
long latestProcessedLocalVersionTopicOffset = partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset();
consumerSubscribe(followerTopicPartition, latestProcessedLocalVersionTopicOffset, localKafkaServer);
LOGGER.info(
"Follower replica: {} resubscribe to offset: {}",
partitionConsumptionState.getReplicaId(),
latestProcessedLocalVersionTopicOffset);
}

protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptionState) throws InterruptedException {
OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
PubSubTopic leaderTopic = offsetRecord.getLeaderTopic(pubSubTopicRepository);
int partition = partitionConsumptionState.getPartition();
consumerUnSubscribe(leaderTopic, partitionConsumptionState);
PubSubTopicPartition leaderTopicPartition = new PubSubTopicPartitionImpl(leaderTopic, partition);
waitForAllMessageToBeProcessedFromTopicPartition(
new PubSubTopicPartitionImpl(leaderTopic, partition),
partitionConsumptionState);
LOGGER.info(
"Leader replica: {} unsubscribe finished for future resubscribe.",
partitionConsumptionState.getReplicaId());
Set<String> leaderSourceKafkaURLs = getConsumptionSourceKafkaAddress(partitionConsumptionState);
for (String leaderSourceKafkaURL: leaderSourceKafkaURLs) {
long leaderStartOffset = partitionConsumptionState.getLeaderOffset(leaderSourceKafkaURL, pubSubTopicRepository);
consumerSubscribe(leaderTopicPartition, leaderStartOffset, leaderSourceKafkaURL);
LOGGER.info(
"Leader replica: {} resubscribe to offset: {}",
partitionConsumptionState.getReplicaId(),
leaderStartOffset);
}
}

/**
* Once leader is marked completed, immediately reset {@link #lastSendIngestionHeartbeatTimestamp}
* such that {@link #maybeSendIngestionHeartbeat()} will send HB SOS to the respective RT topics
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.linkedin.davinci.kafka.consumer;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This class is for wrapping the information about the role of the partition replica on that host to
* {@link AggKafkaConsumerService} to achieve finer granularity of consumer assignment. Those information should be
* triggered by store version role (future, current and backup), workload type and leader or follower state. Version role
* and workload type information are properly managed by {@link StoreIngestionTask} to be sent to
* {@link AggKafkaConsumerService}. We could add more information regarding this partition replica if needed in the future.
*/

public class PartitionReplicaIngestionContext {
private static final Logger LOGGER = LogManager.getLogger(PartitionReplicaIngestionContext.class);
private final PubSubTopicPartition pubSubTopicPartition;
private final PubSubTopic versionTopic;
private final VersionRole versionRole;
private final WorkloadType workloadType;

public enum VersionRole {
CURRENT, BACKUP, FUTURE
}

// TODO: Add more workload types if needed, here we only care about active active or write compute workload.
public enum WorkloadType {
AA_OR_WRITE_COMPUTE, NON_AA_OR_WRITE_COMPUTE
}

public PartitionReplicaIngestionContext(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition,
VersionRole versionRole,
WorkloadType workloadType) {
this.versionTopic = versionTopic;
this.pubSubTopicPartition = pubSubTopicPartition;
this.versionRole = versionRole;
this.workloadType = workloadType;
}

public VersionRole getVersionRole() {
return versionRole;
}

public PubSubTopicPartition getPubSubTopicPartition() {
return pubSubTopicPartition;
}

public PubSubTopic getVersionTopic() {
return versionTopic;
}

public WorkloadType getWorkloadType() {
return workloadType;
}

public static WorkloadType getWorkloadType(PubSubTopic versionTopic, Store store) {
checkVersionTopicStoreOrThrow(versionTopic, store);
int versionNumber = Version.parseVersionFromKafkaTopicName(versionTopic.getName());
Version version = store.getVersion(versionNumber);
if (version == null) {
return WorkloadType.NON_AA_OR_WRITE_COMPUTE;
}
if (store.isWriteComputationEnabled() || version.isActiveActiveReplicationEnabled()) {
return WorkloadType.AA_OR_WRITE_COMPUTE;
}
return WorkloadType.NON_AA_OR_WRITE_COMPUTE;
}

public static VersionRole getStoreVersionRole(PubSubTopic versionTopic, Store store) {
checkVersionTopicStoreOrThrow(versionTopic, store);
int versionNumber = Version.parseVersionFromKafkaTopicName(versionTopic.getName());
int currentVersionNumber = store.getCurrentVersion();
if (currentVersionNumber < versionNumber) {
return VersionRole.FUTURE;
} else if (currentVersionNumber > versionNumber) {
return VersionRole.BACKUP;
} else {
return VersionRole.CURRENT;
}
}

private static void checkVersionTopicStoreOrThrow(PubSubTopic versionTopic, Store store) {
if (store == null) {
LOGGER.error("Invalid store meta-data for {}", versionTopic);
throw new VeniceNoStoreException(versionTopic.getStoreName());
}

if (!store.getName().equals(versionTopic.getStoreName())) {
throw new VeniceException(
"Store name mismatch for store " + store.getName() + " and version topic " + versionTopic);
}
}

}
Loading

0 comments on commit 1cc259a

Please sign in to comment.