Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][dvc] Remove KafkaAdminClass references and merge KafkaSSLUtils into ApacheKafkaUtils #1519

Merged
merged 5 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT;
import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT;
import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
import static com.linkedin.venice.client.store.ClientFactory.getTransportClient;
Expand Down Expand Up @@ -48,7 +47,6 @@
import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.SchemaRepoBackedSchemaReader;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
Expand Down Expand Up @@ -707,22 +705,21 @@ private VeniceConfigLoader buildVeniceConfig() {
recordTransformerOutputValueSchema = Objects.toString(recordTransformerConfig.getOutputValueSchema(), "null");
}

VeniceProperties config = new PropertyBuilder().put(KAFKA_ADMIN_CLASS, ApacheKafkaAdminAdapter.class.getName())
VeniceProperties config = new PropertyBuilder().put(CLUSTER_NAME, clusterName)
.put(ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER, 4) // RocksDB default config
.put(ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20) // RocksDB default config
.put(ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER, 36) // RocksDB default config
.put(ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER_WRITE_ONLY_VERSION, 40)
.put(ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER_WRITE_ONLY_VERSION, 60)
.put(ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION, 80)
.put(CLUSTER_NAME, clusterName)
.put(ZOOKEEPER_ADDRESS, zkAddress)
.put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers)
.put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK)
.put(INGESTION_USE_DA_VINCI_CLIENT, true)
.put(RECORD_TRANSFORMER_VALUE_SCHEMA, recordTransformerOutputValueSchema)
.put(INGESTION_ISOLATION_CONFIG_PREFIX + "." + INGESTION_MEMORY_LIMIT, -1) // Explicitly disable memory limiter
// in Isolated Process
// Explicitly disable memory limiter in Isolated Process
.put(INGESTION_ISOLATION_CONFIG_PREFIX + "." + INGESTION_MEMORY_LIMIT, -1)
.put(backendConfig.toProperties())
.build();
logger.info("backendConfig=" + config.toString(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import com.linkedin.venice.exceptions.ConfigurationException;
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaUtils;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
Expand Down Expand Up @@ -137,7 +137,7 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String

String kafkaSecurityProtocolString =
clusterProps.getString(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name());
if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocolString)) {
if (!ApacheKafkaUtils.isKafkaProtocolValid(kafkaSecurityProtocolString)) {
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
this.kafkaSecurityProtocol = PubSubSecurityProtocol.forName(kafkaSecurityProtocolString);
Expand Down Expand Up @@ -240,10 +240,10 @@ public VeniceClusterConfig(VeniceProperties clusterProps, Map<String, Map<String
}
this.kafkaClusterUrlToAliasMap = Collections.unmodifiableMap(tmpKafkaClusterUrlToAliasMap);

if (!KafkaSSLUtils.isKafkaProtocolValid(kafkaSecurityProtocolString)) {
if (!ApacheKafkaUtils.isKafkaProtocolValid(kafkaSecurityProtocolString)) {
throw new ConfigurationException("Invalid kafka security protocol: " + kafkaSecurityProtocolString);
}
if (KafkaSSLUtils.isKafkaSSLProtocol(kafkaSecurityProtocolString)
if (ApacheKafkaUtils.isKafkaSSLProtocol(kafkaSecurityProtocolString)
|| kafkaBootstrapUrlToSecurityProtocol.containsValue(PubSubSecurityProtocol.SSL)) {
this.sslConfig = Optional.of(new SSLConfig(clusterProps));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT_STORE_LIST;
import static com.linkedin.venice.ConfigKeys.INGESTION_MLOCK_ENABLED;
import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT;
import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS;
import static com.linkedin.venice.ConfigKeys.KAFKA_PRODUCER_METRICS;
import static com.linkedin.venice.ConfigKeys.KAFKA_READ_ONLY_ADMIN_CLASS;
import static com.linkedin.venice.ConfigKeys.KAFKA_WRITE_ONLY_ADMIN_CLASS;
import static com.linkedin.venice.ConfigKeys.KEY_VALUE_PROFILING_ENABLED;
import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED;
import static com.linkedin.venice.ConfigKeys.LEADER_FOLLOWER_STATE_TRANSITION_THREAD_POOL_STRATEGY;
Expand Down Expand Up @@ -196,7 +193,6 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
import com.linkedin.venice.throttle.VeniceRateLimiter;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
Expand Down Expand Up @@ -435,9 +431,6 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int computeQueueCapacity;
private final BlockingQueueType blockingQueueType;
private final boolean restServiceEpollEnabled;
private final String kafkaAdminClass;
private final String kafkaWriteOnlyClass;
private final String kafkaReadOnlyClass;
private final long routerConnectionWarmingDelayMs;
private final boolean helixHybridStoreQuotaEnabled;
private final long ssdHealthCheckShutdownTimeMs;
Expand Down Expand Up @@ -747,9 +740,6 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
}

restServiceEpollEnabled = serverProperties.getBoolean(SERVER_REST_SERVICE_EPOLL_ENABLED, false);
kafkaAdminClass = serverProperties.getString(KAFKA_ADMIN_CLASS, ApacheKafkaAdminAdapter.class.getName());
kafkaWriteOnlyClass = serverProperties.getString(KAFKA_WRITE_ONLY_ADMIN_CLASS, kafkaAdminClass);
kafkaReadOnlyClass = serverProperties.getString(KAFKA_READ_ONLY_ADMIN_CLASS, kafkaAdminClass);
// Disable it by default, and when router connection warming is enabled, we need to adjust this config.
routerConnectionWarmingDelayMs = serverProperties.getLong(SERVER_ROUTER_CONNECTION_WARMING_DELAY_MS, 0);
String sharedConsumerAssignmentStrategyStr = serverProperties.getString(
Expand Down Expand Up @@ -1326,18 +1316,6 @@ public boolean isRestServiceEpollEnabled() {
return restServiceEpollEnabled;
}

public String getKafkaAdminClass() {
return kafkaAdminClass;
}

public String getKafkaWriteOnlyClass() {
return kafkaWriteOnlyClass;
}

public String getKafkaReadOnlyClass() {
return kafkaReadOnlyClass;
}

public long getRouterConnectionWarmingDelayMs() {
return routerConnectionWarmingDelayMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaUtils;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol;
Expand All @@ -77,7 +78,6 @@
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.DiskUsage;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.SystemTime;
Expand Down Expand Up @@ -1168,7 +1168,7 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot
}
properties.setProperty(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapUrls);
PubSubSecurityProtocol securityProtocol = serverConfig.getKafkaSecurityProtocol(kafkaBootstrapUrls);
if (KafkaSSLUtils.isKafkaSSLProtocol(securityProtocol)) {
if (ApacheKafkaUtils.isKafkaSSLProtocol(securityProtocol)) {
Optional<SSLConfig> sslConfig = serverConfig.getSslConfig();
if (!sslConfig.isPresent()) {
throw new VeniceException("SSLConfig should be present when Kafka SSL is enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.venice.hadoop.ssl.SSLConfigurator;
import com.linkedin.venice.hadoop.ssl.UserCredentialsFactory;
import com.linkedin.venice.hadoop.utils.HadoopUtils;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaUtils;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.security.SSLFactory;
Expand All @@ -28,7 +29,6 @@
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.DictionaryUtils;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
Expand All @@ -55,7 +55,7 @@ public static VeniceProperties getConsumerProperties(JobConf config) {
veniceProperties.clipAndFilterNamespace(KafkaInputRecordReader.KIF_RECORD_READER_KAFKA_CONFIG_PREFIX)
.toProperties());
// Copy the mandatory ssl configs
KafkaSSLUtils.validateAndCopyKafkaSSLConfig(veniceProperties, consumerFactoryProperties);
ApacheKafkaUtils.validateAndCopyKafkaSSLConfig(veniceProperties, consumerFactoryProperties);
} catch (IOException e) {
throw new VeniceException("Could not get user credential for job:" + config.getJobName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;


public class VeniceProperties implements Serializable {
Expand Down Expand Up @@ -80,30 +81,73 @@ private String get(String key) {
}

/**
* This method looks for all properties that begins with the given
* namespace. Once those properties are identified it removes
* the namespace and returns the properties.
* Extracts properties that begin with the specified namespace.
* <p>
* This method identifies all properties that start with the given namespace, removes the namespace
* prefix, and returns the filtered properties.
* </p>
* <p>
* This enables support for dynamic configurations (e.g., required by Pub/Sub clients).
* All properties following a namespace-based convention can be extracted and used
* by various adapters, such as Pub/Sub Producers or Consumers.
* </p>
*
* This enables support of dynamic kafka configurations. All Kafka
* Properties can follow an convention of namespace and the properties
* are extracted and supplied to the Kafka Producer/Consumer.
*
* @param nameSpace namespace to look for
* @return properties matches a namespace, but after removing the namespace.
* @param nameSpace The namespace to filter properties by.
* @return A {@link VeniceProperties} instance containing properties with the namespace removed.
*/
public VeniceProperties clipAndFilterNamespace(String nameSpace) {
return clipAndFilterNamespace(Collections.singleton(nameSpace));
}

/**
* Extracts properties that begin with any of the specified namespaces.
* <p>
* This method identifies all properties that start with one of the provided namespaces, removes
* the matching namespace prefix, and returns the filtered properties.
* </p>
* <p>
* This supports dynamic configurations for various use cases, including Pub/Sub clients,
* messaging systems, and other configurable components. By following a namespace-based convention,
* different configurations can be extracted and supplied to respective adapters.
* </p>
* <p>
* Example structure:
* </p>
* <pre>{@code
* "pubsub.kafka.bootstrap.servers"
* "pubsub.pulsar.service.url"
* "pubsub.warpstream.bucket.url"
* }</pre>
* <p>
* Using this method with namespaces {"pubsub.kafka.", "pubsub.pulsar."} would result in:
* </p>
* <pre>{@code
* "bootstrap.servers"
* "service.url"
* }</pre>
*
* @param namespaces A set of namespaces to filter properties by.
* @return A {@link VeniceProperties} instance containing properties with the matching namespaces removed.
*/
public VeniceProperties clipAndFilterNamespace(Set<String> namespaces) {
PropertyBuilder builder = new PropertyBuilder();
if (!nameSpace.endsWith(".")) {
nameSpace = nameSpace + ".";
}

// Ensure all namespaces end with "."
Set<String> formattedNamespaces =
namespaces.stream().map(ns -> ns.endsWith(".") ? ns : ns + ".").collect(Collectors.toSet());

for (Map.Entry<String, String> entry: this.props.entrySet()) {
String key = entry.getKey();
if (key.startsWith(nameSpace)) {
String extractedKey = key.substring(nameSpace.length());
builder.put(extractedKey, this.props.get(key));

for (String namespace: formattedNamespaces) {
if (key.startsWith(namespace)) {
String extractedKey = key.substring(namespace.length());
builder.put(extractedKey, entry.getValue());
break; // Avoid unnecessary checks once a match is found
}
}
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import static org.testng.Assert.assertThrows;

import com.linkedin.venice.exceptions.VeniceException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -40,4 +46,105 @@ public void testGetMapWhenMapIsStringEncoded() {
.build();
assertThrows(VeniceException.class, () -> invalidVeniceProperties.getMap("region.to.pubsub.broker.map"));
}

@Test
public void testClipAndFilterNamespaceTwoNamespaces() {
Map<CharSequence, CharSequence> props = new HashMap<>();
props.put("kafka.key1", "value1");
props.put("pubsub.kafka.key1", "value1");
props.put("kafka.key2", "value2");
props.put("kafka.key3.key30", "value330");
props.put("pubsub.kafka.key3.key30", "value330");
props.put("pubsub.kafka.key3.key31", "value331");
props.put("bogus.kafka.key3.key32", "value332");
VeniceProperties veniceProperties = new VeniceProperties(props);

VeniceProperties kafkaPrefixProps = veniceProperties.clipAndFilterNamespace("kafka.");
Properties kafkaPrefixProperties = kafkaPrefixProps.toProperties();
assertEquals(kafkaPrefixProperties.size(), 3);
assertEquals(kafkaPrefixProperties.getProperty("key1"), "value1");
assertEquals(kafkaPrefixProperties.getProperty("key2"), "value2");
assertEquals(kafkaPrefixProperties.getProperty("key3.key30"), "value330");

VeniceProperties pubsubKafkaPrefixProps = veniceProperties.clipAndFilterNamespace("pubsub.kafka.");
Properties pubsubKafkaPrefixProperties = pubsubKafkaPrefixProps.toProperties();
assertEquals(pubsubKafkaPrefixProperties.size(), 3);
assertEquals(pubsubKafkaPrefixProperties.getProperty("key1"), "value1");
assertEquals(pubsubKafkaPrefixProperties.getProperty("key3.key30"), "value330");
assertEquals(pubsubKafkaPrefixProperties.getProperty("key3.key31"), "value331");

// Test both prefixes
Set<String> prefixes = new HashSet<>(Arrays.asList("kafka.", "pubsub.kafka."));
VeniceProperties bothPrefixProps = veniceProperties.clipAndFilterNamespace(prefixes);
Properties bothPrefixProperties = bothPrefixProps.toProperties();
assertEquals(bothPrefixProperties.size(), 4);
assertEquals(bothPrefixProperties.getProperty("key1"), "value1");
assertEquals(bothPrefixProperties.getProperty("key2"), "value2");
assertEquals(bothPrefixProperties.getProperty("key3.key30"), "value330");
assertEquals(bothPrefixProperties.getProperty("key3.key31"), "value331");
}

@Test
public void testClipAndFilterNamespaceSingleNamespace() {
Map<CharSequence, CharSequence> props = new HashMap<>();
props.put("kafka.key1", "value1");
props.put("kafka.key2", "value2");
props.put("database.host", "localhost");
props.put("database.port", "5432");

VeniceProperties veniceProperties = new VeniceProperties(props);

VeniceProperties kafkaProps = veniceProperties.clipAndFilterNamespace("kafka.");
Properties kafkaProperties = kafkaProps.toProperties();

assertEquals(kafkaProperties.size(), 2);
assertEquals(kafkaProperties.getProperty("key1"), "value1");
assertEquals(kafkaProperties.getProperty("key2"), "value2");
}

@Test
public void testClipAndFilterNamespaceNoMatchingProperties() {
Map<CharSequence, CharSequence> props = new HashMap<>();
props.put("app.config.path", "/usr/local/");
props.put("logging.level", "DEBUG");

VeniceProperties veniceProperties = new VeniceProperties(props);

VeniceProperties kafkaProps = veniceProperties.clipAndFilterNamespace("kafka.");
Properties kafkaProperties = kafkaProps.toProperties();

assertEquals(kafkaProperties.size(), 0);
}

@Test
public void testClipAndFilterNamespaceWithEmptyNamespacesSet() {
Map<CharSequence, CharSequence> props = new HashMap<>();
props.put("kafka.key1", "value1");
props.put("database.host", "localhost");

VeniceProperties veniceProperties = new VeniceProperties(props);

VeniceProperties emptyProps = veniceProperties.clipAndFilterNamespace(Collections.emptySet());
Properties resultProperties = emptyProps.toProperties();

assertEquals(resultProperties.size(), 0);
}

@Test
public void testClipAndFilterNamespaceAllPropertiesFiltered() {
Map<CharSequence, CharSequence> props = new HashMap<>();
props.put("kafka.key1", "value1");
props.put("kafka.key2", "value2");
props.put("kafka.key3", "value3");

VeniceProperties veniceProperties = new VeniceProperties(props);

VeniceProperties filteredProps = veniceProperties.clipAndFilterNamespace("kafka.");
Properties resultProperties = filteredProps.toProperties();

assertEquals(resultProperties.size(), 3);
assertEquals(resultProperties.getProperty("key1"), "value1");
assertEquals(resultProperties.getProperty("key2"), "value2");
assertEquals(resultProperties.getProperty("key3"), "value3");
}
}
Loading
Loading