Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,17 @@ private ConfigKeys() {
*/
public static final String KAFKA_REPLICATION_FACTOR_RT_TOPICS = "kafka.replication.factor.rt.topics";

/**
* Disable unclean leader election for real-time buffer topics.
*
* If set to false, unclean leader election will be disabled for RT topics,
* which prevents data loss at the cost of potential unavailability.
*
* Will use the Kafka cluster's default if not set.
*/
public static final String KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE_RT_TOPICS =
"kafka.unclean.leader.election.enable.rt.topics";

/**
* Cluster-level config to enable active-active replication for new hybrid stores.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,36 @@ public class PubSubTopicConfiguration implements Cloneable {
Long minLogCompactionLagMs;
Optional<Long> maxLogCompactionLagMs;
Optional<Integer> minInSyncReplicas;
Optional<Boolean> uncleanLeaderElectionEnable;

public PubSubTopicConfiguration(
Optional<Long> retentionInMs,
boolean isLogCompacted,
Optional<Integer> minInSyncReplicas,
Long minLogCompactionLagMs,
Optional<Long> maxLogCompactionLagMs) {
this(
retentionInMs,
isLogCompacted,
minInSyncReplicas,
minLogCompactionLagMs,
maxLogCompactionLagMs,
Optional.empty());
}

public PubSubTopicConfiguration(
Optional<Long> retentionInMs,
boolean isLogCompacted,
Optional<Integer> minInSyncReplicas,
Long minLogCompactionLagMs,
Optional<Long> maxLogCompactionLagMs,
Optional<Boolean> uncleanLeaderElectionEnable) {
this.retentionInMs = retentionInMs;
this.isLogCompacted = isLogCompacted;
this.minInSyncReplicas = minInSyncReplicas;
this.minLogCompactionLagMs = minLogCompactionLagMs;
this.maxLogCompactionLagMs = maxLogCompactionLagMs;
this.uncleanLeaderElectionEnable = uncleanLeaderElectionEnable;
}

/**
Expand Down Expand Up @@ -93,15 +111,30 @@ public void setMaxLogCompactionLagMs(Optional<Long> maxLogCompactionLagMs) {
this.maxLogCompactionLagMs = maxLogCompactionLagMs;
}

/**
* @return whether unclean leader election is enabled for this topic
*/
public Optional<Boolean> getUncleanLeaderElectionEnable() {
return uncleanLeaderElectionEnable;
}

/**
* @param uncleanLeaderElectionEnable whether unclean leader election is enabled for this topic
*/
public void setUncleanLeaderElectionEnable(Optional<Boolean> uncleanLeaderElectionEnable) {
this.uncleanLeaderElectionEnable = uncleanLeaderElectionEnable;
}

@Override
public String toString() {
return String.format(
"TopicConfiguration(retentionInMs = %s, isLogCompacted = %s, minInSyncReplicas = %s, minLogCompactionLagMs = %s, maxLogCompactionLagMs = %s)",
"TopicConfiguration(retentionInMs = %s, isLogCompacted = %s, minInSyncReplicas = %s, minLogCompactionLagMs = %s, maxLogCompactionLagMs = %s, uncleanLeaderElectionEnable = %s)",
retentionInMs.isPresent() ? retentionInMs.get() : "not set",
isLogCompacted,
minInSyncReplicas.isPresent() ? minInSyncReplicas.get() : "not set",
minLogCompactionLagMs,
maxLogCompactionLagMs.isPresent() ? maxLogCompactionLagMs.get() : " not set");
maxLogCompactionLagMs.isPresent() ? maxLogCompactionLagMs.get() : "not set",
uncleanLeaderElectionEnable.isPresent() ? uncleanLeaderElectionEnable.get() : "not set");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,16 @@ private PubSubTopicConfiguration marshallProperties(Config config) {
Optional<Long> maxLogCompactionLagMs = properties.containsKey(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG)
? Optional.of(Long.parseLong(properties.getProperty(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG)))
: Optional.empty();
Optional<Boolean> uncleanLeaderElectionEnable =
Optional.ofNullable(properties.getProperty(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG))
.map(Boolean::parseBoolean);
return new PubSubTopicConfiguration(
retentionMs,
isLogCompacted,
minInSyncReplicas,
minLogCompactionLagMs,
maxLogCompactionLagMs);
maxLogCompactionLagMs,
uncleanLeaderElectionEnable);
}

private Properties unmarshallProperties(PubSubTopicConfiguration pubSubTopicConfiguration) {
Expand All @@ -433,6 +437,10 @@ private Properties unmarshallProperties(PubSubTopicConfiguration pubSubTopicConf
.ifPresent(
minIsrConfig -> topicProperties
.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Integer.toString(minIsrConfig)));
pubSubTopicConfiguration.getUncleanLeaderElectionEnable()
.ifPresent(
uncleanLeaderElection -> topicProperties
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting used here. unmarshallProperties is used in the createTopic method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only set if setting is present. This will make sure we use cluster configs by default.

.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, Boolean.toString(uncleanLeaderElection)));
// Just in case the Kafka cluster isn't configured as expected.
topicProperties.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString());
return topicProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,42 @@ public void createTopic(
boolean logCompaction,
Optional<Integer> minIsr,
boolean useFastPubSubOperationTimeout) {
createTopic(
topicName,
numPartitions,
replication,
retentionTimeMs,
logCompaction,
minIsr,
Optional.empty(),
useFastPubSubOperationTimeout);
}

/**
* Create a topic, and block until the topic is created, with a default timeout of
* {@value PubSubConstants#PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE}, after which this function will throw a VeniceException.
*
* @param topicName Name for the new topic
* @param numPartitions number of partitions
* @param replication replication factor
* @param retentionTimeMs Retention time, in ms, for the topic
* @param logCompaction whether to enable log compaction on the topic
* @param minIsr if present, will apply the specified min.isr to this topic,
* if absent, PubSub cluster defaults will be used
* @param uncleanLeaderElectionEnable if present, will apply the specified unclean.leader.election.enable to this topic,
* if absent, PubSub cluster defaults will be used
* @param useFastPubSubOperationTimeout if false, normal PubSub operation timeout will be used,
* if true, a much shorter timeout will be used to make topic creation non-blocking.
*/
public void createTopic(
PubSubTopic topicName,
int numPartitions,
int replication,
long retentionTimeMs,
boolean logCompaction,
Optional<Integer> minIsr,
Optional<Boolean> uncleanLeaderElectionEnable,
boolean useFastPubSubOperationTimeout) {
long startTimeMs = System.currentTimeMillis();
long deadlineMs = startTimeMs + (useFastPubSubOperationTimeout
? PUBSUB_FAST_OPERATION_TIMEOUT_MS
Expand All @@ -197,7 +233,8 @@ public void createTopic(
logCompaction,
minIsr,
topicManagerContext.getTopicMinLogCompactionLagMs(),
Optional.empty());
Optional.empty(),
uncleanLeaderElectionEnable);
logger.info(
"Creating topic: {} partitions: {} replication: {}, configuration: {}",
topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_OVER_SSL;
import static com.linkedin.venice.ConfigKeys.KAFKA_REPLICATION_FACTOR;
import static com.linkedin.venice.ConfigKeys.KAFKA_REPLICATION_FACTOR_RT_TOPICS;
import static com.linkedin.venice.ConfigKeys.KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE_RT_TOPICS;
import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED;
import static com.linkedin.venice.ConfigKeys.LEAKED_PUSH_STATUS_CLEAN_UP_SERVICE_SLEEP_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.LEAKED_RESOURCE_ALLOWED_LINGER_TIME_MS;
Expand Down Expand Up @@ -561,6 +562,7 @@ public class VeniceControllerClusterConfig {
private final Optional<Integer> minInSyncReplicas;
private final Optional<Integer> minInSyncReplicasRealTimeTopics;
private final Optional<Integer> minInSyncReplicasAdminTopics;
private final Optional<Boolean> uncleanLeaderElectionEnableRTTopics;
private final boolean kafkaLogCompactionForHybridStores;

/**
Expand Down Expand Up @@ -699,6 +701,9 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.minInSyncReplicas = props.getOptionalInt(KAFKA_MIN_IN_SYNC_REPLICAS);
this.minInSyncReplicasRealTimeTopics = props.getOptionalInt(KAFKA_MIN_IN_SYNC_REPLICAS_RT_TOPICS);
this.minInSyncReplicasAdminTopics = props.getOptionalInt(KAFKA_MIN_IN_SYNC_REPLICAS_ADMIN_TOPICS);
this.uncleanLeaderElectionEnableRTTopics = props.containsKey(KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE_RT_TOPICS)
? Optional.of(props.getBoolean(KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE_RT_TOPICS))
: Optional.empty();
this.kafkaLogCompactionForHybridStores = props.getBoolean(KAFKA_LOG_COMPACTION_FOR_HYBRID_STORES, true);
this.replicationFactor = props.getInt(DEFAULT_REPLICA_FACTOR);
this.minNumberOfPartitions = props.getInt(DEFAULT_NUMBER_OF_PARTITION);
Expand Down Expand Up @@ -1503,6 +1508,10 @@ public Optional<Integer> getMinInSyncReplicasAdminTopics() {
return minInSyncReplicasAdminTopics;
}

public Optional<Boolean> getUncleanLeaderElectionEnableRTTopics() {
return uncleanLeaderElectionEnableRTTopics;
}

public boolean isKafkaLogCompactionForHybridStoresEnabled() {
return kafkaLogCompactionForHybridStores;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3584,6 +3584,7 @@ void createOrUpdateRealTimeTopic(String clusterName, Store store, Version versio
false,
// Note: do not enable RT compaction! Might make jobs in Online/Offline model stuck
clusterConfig.getMinInSyncReplicasRealTimeTopics(),
clusterConfig.getUncleanLeaderElectionEnableRTTopics(),
false);
}
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_MIN_IN_SYNC_REPLICAS_RT_TOPICS;
import static com.linkedin.venice.ConfigKeys.KAFKA_REPLICATION_FACTOR;
import static com.linkedin.venice.ConfigKeys.KAFKA_REPLICATION_FACTOR_RT_TOPICS;
import static com.linkedin.venice.ConfigKeys.KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE_RT_TOPICS;
import static com.linkedin.venice.VeniceConstants.REWIND_TIME_DECIDED_BY_SERVER;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.TOPIC_SWITCH;
import static com.linkedin.venice.pubsub.PubSubConstants.DEFAULT_KAFKA_REPLICATION_FACTOR;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class RealTimeTopicSwitcher {
private final int kafkaReplicationFactorForRTTopics;
private final int kafkaReplicationFactor;
private final Optional<Integer> minSyncReplicasForRTTopics;
private final Optional<Boolean> uncleanLeaderElectionEnableForRTTopics;

private final PubSubTopicRepository pubSubTopicRepository;

Expand All @@ -71,6 +73,10 @@ public RealTimeTopicSwitcher(
this.kafkaReplicationFactorForRTTopics =
veniceProperties.getInt(KAFKA_REPLICATION_FACTOR_RT_TOPICS, kafkaReplicationFactor);
this.minSyncReplicasForRTTopics = veniceProperties.getOptionalInt(KAFKA_MIN_IN_SYNC_REPLICAS_RT_TOPICS);
this.uncleanLeaderElectionEnableForRTTopics =
veniceProperties.containsKey(KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE_RT_TOPICS)
? Optional.of(veniceProperties.getBoolean(KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE_RT_TOPICS))
: Optional.empty();
}

/**
Expand Down Expand Up @@ -155,13 +161,16 @@ void createRealTimeTopicIfNeeded(Store store, Version version, PubSubTopic realT
}
int replicationFactor = realTimeTopic.isRealTime() ? kafkaReplicationFactorForRTTopics : kafkaReplicationFactor;
Optional<Integer> minISR = realTimeTopic.isRealTime() ? minSyncReplicasForRTTopics : Optional.empty();
Optional<Boolean> uncleanLeaderElection =
realTimeTopic.isRealTime() ? uncleanLeaderElectionEnableForRTTopics : Optional.empty();
getTopicManager().createTopic(
realTimeTopic,
partitionCount,
replicationFactor,
StoreUtils.getExpectedRetentionTimeInMs(store, store.getHybridStoreConfig()),
false,
minISR,
uncleanLeaderElection,
false);
} else {
/**
Expand Down
Loading