Skip to content

Commit 7433e05

Browse files
committed
address comments
1 parent 8ff7ebb commit 7433e05

File tree

5 files changed

+30
-20
lines changed

5 files changed

+30
-20
lines changed

core/src/main/scala/kafka/server/KafkaConfig.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
605605
// check controller listener names (they won't appear in listeners when process.roles=broker)
606606
// as well as listeners for occurrences of SSL or SASL_*
607607
if (controllerListenerNames.exists(isSslOrSasl) ||
608-
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).asScala.exists(listenerValue => isSslOrSasl(EndPoint.parseListenerName(listenerValue)))) {
608+
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).asScala.exists(listenerValue => isSslOrSasl(CoreUtils.parseListenerName(listenerValue)))) {
609609
mapValue // don't add default mappings since we found something that is SSL or SASL_*
610610
} else {
611611
// add the PLAINTEXT mappings for all controller listener names that are not explicitly PLAINTEXT

core/src/main/scala/kafka/utils/CoreUtils.scala

+10
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ import org.apache.commons.validator.routines.InetAddressValidator
3232
import org.apache.kafka.common.network.ListenerName
3333
import org.apache.kafka.common.security.auth.SecurityProtocol
3434
import org.apache.kafka.common.utils.Utils
35+
import org.apache.kafka.common.KafkaException
3536
import org.apache.kafka.network.SocketServerConfigs
3637
import org.slf4j.event.Level
3738

3839
import java.util
40+
import java.util.Locale
3941
import scala.jdk.CollectionConverters._
4042

4143
/**
@@ -244,4 +246,12 @@ object CoreUtils {
244246
def replicaToBrokerAssignmentAsScala(map: util.Map[Integer, util.List[Integer]]): Map[Int, Seq[Int]] = {
245247
map.asScala.map(e => (e._1.asInstanceOf[Int], e._2.asScala.map(_.asInstanceOf[Int])))
246248
}
249+
250+
def parseListenerName(connectionString: String): String = {
251+
val firstColon = connectionString.indexOf(':')
252+
if (firstColon < 0) {
253+
throw new KafkaException(s"Unable to parse a listener name from $connectionString")
254+
}
255+
connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT)
256+
}
247257
}

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

+5-3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ import org.junit.jupiter.api.Assertions._
4646
import org.junit.jupiter.api.Test
4747
import org.junit.jupiter.api.function.Executable
4848

49+
import org.apache.kafka.common.test.{TestUtils => JTestUtils}
50+
4951
import scala.jdk.CollectionConverters._
5052

5153
class KafkaConfigTest {
@@ -594,8 +596,8 @@ class KafkaConfigTest {
594596
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "plaintext://localhost:9091,SsL://localhost:9092")
595597
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT")
596598
val config = KafkaConfig.fromProps(props)
597-
assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
598-
assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString))
599+
assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(JTestUtils.endpointToString))
600+
assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(JTestUtils.endpointToString))
599601
}
600602

601603
private def listenerListToEndPoints(listenerList: String,
@@ -1180,7 +1182,7 @@ class KafkaConfigTest {
11801182

11811183
val config = KafkaConfig.fromProps(defaults)
11821184
assertEquals(1, config.brokerId)
1183-
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(_.connectionString))
1185+
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString))
11841186
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
11851187
assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
11861188
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)

server/src/main/java/org/apache/kafka/network/EndPoint.java

-16
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@
1717

1818
package org.apache.kafka.network;
1919

20-
import org.apache.kafka.common.KafkaException;
2120
import org.apache.kafka.common.network.ListenerName;
2221
import org.apache.kafka.common.security.auth.SecurityProtocol;
23-
import org.apache.kafka.common.utils.Utils;
2422

25-
import java.util.Locale;
2623
import java.util.Objects;
2724

2825
public class EndPoint {
@@ -38,24 +35,11 @@ public EndPoint(String host, int port, ListenerName listenerName, SecurityProtoc
3835
this.securityProtocol = securityProtocol;
3936
}
4037

41-
public static String parseListenerName(String connectionString) {
42-
int firstColon = connectionString.indexOf(':');
43-
if (firstColon < 0) {
44-
throw new KafkaException("Unable to parse a listener name from " + connectionString);
45-
}
46-
return connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT);
47-
}
48-
4938
public static EndPoint fromPublic(org.apache.kafka.common.Endpoint endpoint) {
5039
return new EndPoint(endpoint.host(), endpoint.port(),
5140
new ListenerName(endpoint.listenerName().get()), endpoint.securityProtocol());
5241
}
5342

54-
public String connectionString() {
55-
String hostport = (host == null) ? (":" + port) : Utils.formatAddress(host, port);
56-
return listenerName.value() + "://" + hostport;
57-
}
58-
5943
public org.apache.kafka.common.Endpoint toPublic() {
6044
return new org.apache.kafka.common.Endpoint(listenerName.value(), securityProtocol, host, port);
6145
}

test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java

+14
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import org.apache.kafka.common.Node;
2222
import org.apache.kafka.common.TopicPartition;
2323
import org.apache.kafka.common.errors.TimeoutException;
24+
import org.apache.kafka.common.network.ListenerName;
2425
import org.apache.kafka.common.utils.Exit;
2526
import org.apache.kafka.common.utils.Utils;
27+
import org.apache.kafka.network.EndPoint;
2628

2729
import org.slf4j.Logger;
2830
import org.slf4j.LoggerFactory;
@@ -101,6 +103,18 @@ static File tempDirectory() {
101103
return file;
102104
}
103105

106+
/**
107+
* Convert EndPoint to String
108+
*/
109+
public static String endpointToString(EndPoint endPoint) {
110+
String host = endPoint.host();
111+
int port = endPoint.port();
112+
ListenerName listenerName = endPoint.listenerName();
113+
114+
String hostport = (host == null) ? (":" + port) : Utils.formatAddress(host, port);
115+
return listenerName.value() + "://" + hostport;
116+
}
117+
104118
/**
105119
* uses default value of 15 seconds for timeout
106120
*/

0 commit comments

Comments
 (0)