Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ subprojects {
// when actually using the new protocol. Example to pin KME to v12 when introducing v13:
// project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY)
def versionOverrides = [
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v39', PathValidation.DIRECTORY),
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v94', PathValidation.DIRECTORY)
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v41', PathValidation.DIRECTORY),
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v96', PathValidation.DIRECTORY)
]

def schemaDirs = [sourceDir]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ private ConfigKeys() {
*/
public static final String KAFKA_REPLICATION_FACTOR_RT_TOPICS = "kafka.replication.factor.rt.topics";

/**
* Sets unclean leader election for real-time buffer topics.
*
* Will use the Kafka cluster's default if not set.
*/
public static final String KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE_RT_TOPICS =
"kafka.unclean.leader.election.enable.rt.topics";

/**
* Cluster-level config to enable active-active replication for new hybrid stores.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ public class ControllerApiConstants {

public static final String BLOB_TRANSFER_ENABLED = "blob_transfer_enabled";
public static final String BLOB_TRANSFER_IN_SERVER_ENABLED = "blob_transfer_in_server_enabled";
public static final String UNCLEAN_LEADER_ELECTION_ENABLED_FOR_RT_TOPICS =
"unclean_leader_election_enabled_for_rt_topics";

public static final String HEARTBEAT_TIMESTAMP = "heartbeat_timestamp";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TTL_REPUSH_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UNCLEAN_LEADER_ELECTION_ENABLED_FOR_RT_TOPICS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPDATED_CONFIGS_LIST;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION;
Expand Down Expand Up @@ -162,6 +163,8 @@ public UpdateStoreQueryParams(StoreInfo srcStore, boolean storeMigrating) {
.setBlobTransferEnabled(srcStore.isBlobTransferEnabled())
.setBlobTransferInServerEnabled(
ConfigCommonUtils.ActivationState.valueOf(srcStore.getBlobTransferInServerEnabled()))
.setUncleanLeaderElectionEnabledForRTTopics(
ConfigCommonUtils.ActivationState.valueOf(srcStore.getUncleanLeaderElectionEnabledForRTTopics()))
.setMaxRecordSizeBytes(srcStore.getMaxRecordSizeBytes())
.setMaxNearlineRecordSizeBytes(srcStore.getMaxNearlineRecordSizeBytes())
.setTargetRegionSwap(srcStore.getTargetRegionSwap())
Expand Down Expand Up @@ -814,6 +817,14 @@ public Optional<String> getBlobTransferInServerEnabled() {
return getString(BLOB_TRANSFER_IN_SERVER_ENABLED);
}

public UpdateStoreQueryParams setUncleanLeaderElectionEnabledForRTTopics(ConfigCommonUtils.ActivationState state) {
return putString(UNCLEAN_LEADER_ELECTION_ENABLED_FOR_RT_TOPICS, state.name());
}

public Optional<String> getUncleanLeaderElectionEnabledForRTTopics() {
return getString(UNCLEAN_LEADER_ELECTION_ENABLED_FOR_RT_TOPICS);
}

public UpdateStoreQueryParams setNearlineProducerCompressionEnabled(boolean compressionEnabled) {
return putBoolean(NEARLINE_PRODUCER_COMPRESSION_ENABLED, compressionEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,16 @@ public String getBlobTransferInServerEnabled() {
return this.delegate.getBlobTransferInServerEnabled();
}

@Override
public String getUncleanLeaderElectionEnabledForRTTopics() {
return this.delegate.getUncleanLeaderElectionEnabledForRTTopics();
}

@Override
public void setUncleanLeaderElectionEnabledForRTTopics(String uncleanLeaderElectionEnabledForRTTopics) {
throw new UnsupportedOperationException("Unclean leader election config is read-only");
}

@Override
public boolean isNearlineProducerCompressionEnabled() {
return delegate.isNearlineProducerCompressionEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ default IntSet getVersionNumbers() {

String getBlobTransferInServerEnabled();

String getUncleanLeaderElectionEnabledForRTTopics();

void setUncleanLeaderElectionEnabledForRTTopics(String uncleanLeaderElectionEnabledForRTTopics);

boolean isNearlineProducerCompressionEnabled();

void setNearlineProducerCompressionEnabled(boolean compressionEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public static StoreInfo fromStore(Store store) {
storeInfo.setUnusedSchemaDeletionEnabled(store.isUnusedSchemaDeletionEnabled());
storeInfo.setBlobTransferEnabled(store.isBlobTransferEnabled());
storeInfo.setBlobTransferInServerEnabled(store.getBlobTransferInServerEnabled());
storeInfo.setUncleanLeaderElectionEnabledForRTTopics(store.getUncleanLeaderElectionEnabledForRTTopics());
storeInfo.setNearlineProducerCompressionEnabled(store.isNearlineProducerCompressionEnabled());
storeInfo.setNearlineProducerCountPerWriter(store.getNearlineProducerCountPerWriter());
storeInfo.setTargetRegionSwap(store.getTargetSwapRegion());
Expand Down Expand Up @@ -361,6 +362,7 @@ public static StoreInfo fromStore(Store store) {

private boolean blobTransferEnabled;
private String blobTransferInServerEnable = ActivationState.NOT_SPECIFIED.name();
private String uncleanLeaderElectionEnabledForRTTopics = ActivationState.NOT_SPECIFIED.name();

private boolean nearlineProducerCompressionEnabled;
private int nearlineProducerCountPerWriter;
Expand Down Expand Up @@ -900,6 +902,14 @@ public String getBlobTransferInServerEnabled() {
return this.blobTransferInServerEnable;
}

public void setUncleanLeaderElectionEnabledForRTTopics(String uncleanLeaderElectionEnabledForRTTopics) {
this.uncleanLeaderElectionEnabledForRTTopics = uncleanLeaderElectionEnabledForRTTopics;
}

public String getUncleanLeaderElectionEnabledForRTTopics() {
return this.uncleanLeaderElectionEnabledForRTTopics;
}

public boolean isNearlineProducerCompressionEnabled() {
return nearlineProducerCompressionEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,16 @@ public String getBlobTransferInServerEnabled() {
return zkSharedStore.getBlobTransferInServerEnabled();
}

@Override
public String getUncleanLeaderElectionEnabledForRTTopics() {
return zkSharedStore.getUncleanLeaderElectionEnabledForRTTopics();
}

@Override
public void setUncleanLeaderElectionEnabledForRTTopics(String uncleanLeaderElectionEnabledForRTTopics) {
throwUnsupportedOperationException("setUncleanLeaderElectionEnabledForRTTopics is not supported in SystemStore");
}

@Override
public void setMaxCompactionLagSeconds(long maxCompactionLagSeconds) {
throwUnsupportedOperationException("setMaxCompactionLagSeconds");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,16 @@ public String getBlobTransferInServerEnabled() {
return this.storeProperties.blobTransferInServerEnabled.toString();
}

@Override
public String getUncleanLeaderElectionEnabledForRTTopics() {
return this.storeProperties.uncleanLeaderElectionEnabledForRTTopics.toString();
}

@Override
public void setUncleanLeaderElectionEnabledForRTTopics(String uncleanLeaderElectionEnabledForRTTopics) {
this.storeProperties.uncleanLeaderElectionEnabledForRTTopics = uncleanLeaderElectionEnabledForRTTopics;
}

@Override
public boolean isNearlineProducerCompressionEnabled() {
return this.storeProperties.nearlineProducerCompressionEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,36 @@ public class PubSubTopicConfiguration implements Cloneable {
Long minLogCompactionLagMs;
Optional<Long> maxLogCompactionLagMs;
Optional<Integer> minInSyncReplicas;
Optional<Boolean> uncleanLeaderElectionEnable;

public PubSubTopicConfiguration(
Optional<Long> retentionInMs,
boolean isLogCompacted,
Optional<Integer> minInSyncReplicas,
Long minLogCompactionLagMs,
Optional<Long> maxLogCompactionLagMs) {
this(
retentionInMs,
isLogCompacted,
minInSyncReplicas,
minLogCompactionLagMs,
maxLogCompactionLagMs,
Optional.empty());
}

public PubSubTopicConfiguration(
Optional<Long> retentionInMs,
boolean isLogCompacted,
Optional<Integer> minInSyncReplicas,
Long minLogCompactionLagMs,
Optional<Long> maxLogCompactionLagMs,
Optional<Boolean> uncleanLeaderElectionEnable) {
this.retentionInMs = retentionInMs;
this.isLogCompacted = isLogCompacted;
this.minInSyncReplicas = minInSyncReplicas;
this.minLogCompactionLagMs = minLogCompactionLagMs;
this.maxLogCompactionLagMs = maxLogCompactionLagMs;
this.uncleanLeaderElectionEnable = uncleanLeaderElectionEnable;
}

/**
Expand Down Expand Up @@ -93,15 +111,30 @@ public void setMaxLogCompactionLagMs(Optional<Long> maxLogCompactionLagMs) {
this.maxLogCompactionLagMs = maxLogCompactionLagMs;
}

/**
* @return whether unclean leader election is enabled for this topic
*/
public Optional<Boolean> getUncleanLeaderElectionEnable() {
return uncleanLeaderElectionEnable;
}

/**
* @param uncleanLeaderElectionEnable whether unclean leader election is enabled for this topic
*/
public void setUncleanLeaderElectionEnable(Optional<Boolean> uncleanLeaderElectionEnable) {
this.uncleanLeaderElectionEnable = uncleanLeaderElectionEnable;
}

@Override
public String toString() {
return String.format(
"TopicConfiguration(retentionInMs = %s, isLogCompacted = %s, minInSyncReplicas = %s, minLogCompactionLagMs = %s, maxLogCompactionLagMs = %s)",
"TopicConfiguration(retentionInMs = %s, isLogCompacted = %s, minInSyncReplicas = %s, minLogCompactionLagMs = %s, maxLogCompactionLagMs = %s, uncleanLeaderElectionEnable = %s)",
retentionInMs.isPresent() ? retentionInMs.get() : "not set",
isLogCompacted,
minInSyncReplicas.isPresent() ? minInSyncReplicas.get() : "not set",
minLogCompactionLagMs,
maxLogCompactionLagMs.isPresent() ? maxLogCompactionLagMs.get() : " not set");
maxLogCompactionLagMs.isPresent() ? maxLogCompactionLagMs.get() : "not set",
uncleanLeaderElectionEnable.isPresent() ? uncleanLeaderElectionEnable.get() : "not set");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,16 @@ private PubSubTopicConfiguration marshallProperties(Config config) {
Optional<Long> maxLogCompactionLagMs = properties.containsKey(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG)
? Optional.of(Long.parseLong(properties.getProperty(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG)))
: Optional.empty();
Optional<Boolean> uncleanLeaderElectionEnable =
Optional.ofNullable(properties.getProperty(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG))
.map(Boolean::parseBoolean);
return new PubSubTopicConfiguration(
retentionMs,
isLogCompacted,
minInSyncReplicas,
minLogCompactionLagMs,
maxLogCompactionLagMs);
maxLogCompactionLagMs,
uncleanLeaderElectionEnable);
}

private Properties unmarshallProperties(PubSubTopicConfiguration pubSubTopicConfiguration) {
Expand All @@ -433,6 +437,10 @@ private Properties unmarshallProperties(PubSubTopicConfiguration pubSubTopicConf
.ifPresent(
minIsrConfig -> topicProperties
.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Integer.toString(minIsrConfig)));
pubSubTopicConfiguration.getUncleanLeaderElectionEnable()
.ifPresent(
uncleanLeaderElection -> topicProperties
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting used here. unmarshallProperties is used in the createTopic method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only set if setting is present. This will make sure we use cluster configs by default.

.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, Boolean.toString(uncleanLeaderElection)));
// Just in case the Kafka cluster isn't configured as expected.
topicProperties.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, TimestampType.LOG_APPEND_TIME.toString());
return topicProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,42 @@ public void createTopic(
boolean logCompaction,
Optional<Integer> minIsr,
boolean useFastPubSubOperationTimeout) {
createTopic(
topicName,
numPartitions,
replication,
retentionTimeMs,
logCompaction,
minIsr,
Optional.empty(),
useFastPubSubOperationTimeout);
}

/**
* Create a topic, and block until the topic is created, with a default timeout of
* {@value PubSubConstants#PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE}, after which this function will throw a VeniceException.
*
* @param topicName Name for the new topic
* @param numPartitions number of partitions
* @param replication replication factor
* @param retentionTimeMs Retention time, in ms, for the topic
* @param logCompaction whether to enable log compaction on the topic
* @param minIsr if present, will apply the specified min.isr to this topic,
* if absent, PubSub cluster defaults will be used
* @param uncleanLeaderElectionEnable if present, will apply the specified unclean.leader.election.enable to this topic,
* if absent, PubSub cluster defaults will be used
* @param useFastPubSubOperationTimeout if false, normal PubSub operation timeout will be used,
* if true, a much shorter timeout will be used to make topic creation non-blocking.
*/
public void createTopic(
PubSubTopic topicName,
int numPartitions,
int replication,
long retentionTimeMs,
boolean logCompaction,
Optional<Integer> minIsr,
Optional<Boolean> uncleanLeaderElectionEnable,
boolean useFastPubSubOperationTimeout) {
long startTimeMs = System.currentTimeMillis();
long deadlineMs = startTimeMs + (useFastPubSubOperationTimeout
? PUBSUB_FAST_OPERATION_TIMEOUT_MS
Expand All @@ -197,7 +233,8 @@ public void createTopic(
logCompaction,
minIsr,
topicManagerContext.getTopicMinLogCompactionLagMs(),
Optional.empty());
Optional.empty(),
uncleanLeaderElectionEnable);
logger.info(
"Creating topic: {} partitions: {} replication: {}, configuration: {}",
topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public enum AvroProtocolDefinition {
*
* TODO: Move AdminOperation to venice-common module so that we can properly reference it here.
*/
ADMIN_OPERATION(94, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
ADMIN_OPERATION(96, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),

/**
* Single chunk of a large multi-chunk value. Just a bunch of bytes.
Expand Down Expand Up @@ -148,7 +148,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(39, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(41, StoreMetaValue.class),

/*
Value Schema for Parent Controller Metadata system store
Expand Down
Loading
Loading