Skip to content

Commit 449aa5f

Browse files
committed
move endpoint to server modeul from core
1 parent 645565e commit 449aa5f

File tree

14 files changed

+155
-84
lines changed

14 files changed

+155
-84
lines changed

clients/src/main/java/org/apache/kafka/common/utils/Utils.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,14 @@ public final class Utils {
9494

9595
private Utils() {}
9696

97-
// This matches URIs of formats: host:port and protocol://host:port
97+
// This matches URIs of formats: host:port
9898
// IPv6 is supported with [ip] pattern
9999
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^(?:[0-9a-zA-Z\\-%._]*://)?\\[?([0-9a-zA-Z\\-%._:]*)]?:([0-9]+)");
100100

101101
private static final Pattern VALID_HOST_CHARACTERS = Pattern.compile("([0-9a-zA-Z\\-%._:]*)");
102102

103+
private static final Pattern PROTOCOL_PATTERN = Pattern.compile("([A-Za-z0-9_]+)://");
104+
103105
// Prints up to 2 decimal digits. Used for human-readable printing
104106
private static final DecimalFormat TWO_DIGIT_FORMAT = new DecimalFormat("0.##",
105107
DecimalFormatSymbols.getInstance(Locale.ENGLISH));
@@ -529,6 +531,16 @@ public static int murmur2(final byte[] data) {
529531
return h;
530532
}
531533

534+
/**
535+
* Extracts the protocol from a "protocol://host:port" address string.
536+
* @param address address string to parse
537+
* @return hostname or null if the given address is incorrect
538+
*/
539+
public static String getProtocol(String address) {
540+
Matcher matcher = PROTOCOL_PATTERN.matcher(address);
541+
return matcher.lookingAt() ? matcher.group(1) : null;
542+
}
543+
532544
/**
533545
* Extracts the hostname from a "host:port" address string.
534546
* @param address address string to parse

clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import static org.apache.kafka.common.utils.Utils.formatBytes;
7878
import static org.apache.kafka.common.utils.Utils.getHost;
7979
import static org.apache.kafka.common.utils.Utils.getPort;
80+
import static org.apache.kafka.common.utils.Utils.getProtocol;
8081
import static org.apache.kafka.common.utils.Utils.intersection;
8182
import static org.apache.kafka.common.utils.Utils.mkEntry;
8283
import static org.apache.kafka.common.utils.Utils.murmur2;
@@ -131,6 +132,18 @@ public void testGetHostValid(String protocol) {
131132
assertEquals("::1", getHost("[::1]:1234"));
132133
}
133134

135+
@ParameterizedTest
136+
@CsvSource(value = {"PLAINTEXT", "SASL_PLAINTEXT", "SSL", "SASL_SSL"})
137+
public void testGetProtocol(String protocol) {
138+
assertEquals(protocol, getProtocol(protocol + "://mydomain.com:8080"));
139+
assertEquals(protocol, getProtocol(protocol + "://MyDomain.com:8080"));
140+
assertEquals(protocol, getProtocol(protocol + "://My_Domain.com:8080"));
141+
assertEquals(protocol, getProtocol(protocol + "://[::1]:1234"));
142+
assertEquals(protocol, getProtocol(protocol + "://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
143+
assertEquals(protocol, getProtocol(protocol + "://[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678"));
144+
assertEquals(protocol, getProtocol(protocol + "://[fe80::b1da:69ca:57f7:63d8%3]:5678"));
145+
}
146+
134147
@ParameterizedTest
135148
@CsvSource(value = {"PLAINTEXT", "SASL_PLAINTEXT", "SSL", "SASL_SSL"})
136149
public void testGetHostInvalid(String protocol) {

core/src/main/scala/kafka/cluster/Broker.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node
2424
import org.apache.kafka.common.network.ListenerName
2525
import org.apache.kafka.common.security.auth.SecurityProtocol
2626
import org.apache.kafka.server.network.BrokerEndPoint
27+
import org.apache.kafka.network.EndPoint
2728

2829
import scala.collection.Seq
2930

@@ -59,7 +60,7 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String], featu
5960
s"$id : ${endPointsMap.values.mkString("(",",",")")} : ${rack.orNull} : $features"
6061

6162
def this(id: Int, host: String, port: Int, listenerName: ListenerName, protocol: SecurityProtocol) = {
62-
this(id, Seq(EndPoint(host, port, listenerName, protocol)), None, emptySupportedFeatures)
63+
this(id, Seq(new EndPoint(host, port, listenerName, protocol)), None, emptySupportedFeatures)
6364
}
6465

6566
def this(bep: BrokerEndPoint, listenerName: ListenerName, protocol: SecurityProtocol) = {

core/src/main/scala/kafka/cluster/EndPoint.scala

-59
This file was deleted.

core/src/main/scala/kafka/network/SocketServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ import java.util
2525
import java.util.Optional
2626
import java.util.concurrent._
2727
import java.util.concurrent.atomic._
28-
import kafka.cluster.EndPoint
2928
import kafka.network.Processor._
3029
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
3130
import kafka.network.SocketServer._
3231
import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
32+
import org.apache.kafka.network.EndPoint
3333
import org.apache.kafka.common.message.ApiMessageType.ListenerType
3434
import kafka.utils._
3535
import org.apache.kafka.common.config.ConfigException

core/src/main/scala/kafka/server/BrokerServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ class BrokerServer(
274274
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
275275

276276
val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
277-
config.effectiveAdvertisedBrokerListeners.map(_.toJava).asJava).
277+
config.effectiveAdvertisedBrokerListeners.map(_.toPublic()).asJava).
278278
withWildcardHostnamesResolved().
279279
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
280280

core/src/main/scala/kafka/server/ControllerServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class ControllerServer(
171171
sharedServer.socketFactory)
172172

173173
val listenerInfo = ListenerInfo
174-
.create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava)
174+
.create(config.effectiveAdvertisedControllerListeners.map(_.toPublic).asJava)
175175
.withWildcardHostnamesResolved()
176176
.withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
177177
socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port())

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ import java.util
2121
import java.util.{Collections, Properties}
2222
import java.util.concurrent.CopyOnWriteArrayList
2323
import java.util.concurrent.locks.ReentrantReadWriteLock
24-
import kafka.cluster.EndPoint
2524
import kafka.log.{LogCleaner, LogManager}
2625
import kafka.network.{DataPlaneAcceptor, SocketServer}
2726
import kafka.server.DynamicBrokerConfig._
2827
import kafka.utils.{CoreUtils, Logging}
2928
import org.apache.kafka.common.Reconfigurable
29+
import org.apache.kafka.network.EndPoint
3030
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
3131
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
3232
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}

core/src/main/scala/kafka/server/KafkaConfig.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package kafka.server
2020
import java.util
2121
import java.util.concurrent.TimeUnit
2222
import java.util.Properties
23-
import kafka.cluster.EndPoint
2423
import kafka.utils.{CoreUtils, Logging}
2524
import kafka.utils.Implicits._
2625
import org.apache.kafka.common.Reconfigurable
@@ -33,6 +32,7 @@ import org.apache.kafka.common.record.TimestampType
3332
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
3433
import org.apache.kafka.common.security.auth.SecurityProtocol
3534
import org.apache.kafka.common.utils.Utils
35+
import org.apache.kafka.network.EndPoint
3636
import org.apache.kafka.coordinator.group.Group.GroupType
3737
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
3838
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}

core/src/main/scala/kafka/utils/CoreUtils.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import com.typesafe.scalalogging.Logger
2727
import javax.management._
2828
import scala.collection._
2929
import scala.collection.Seq
30-
import kafka.cluster.EndPoint
30+
import org.apache.kafka.network.EndPoint
3131
import org.apache.commons.validator.routines.InetAddressValidator
3232
import org.apache.kafka.common.network.ListenerName
3333
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -209,7 +209,7 @@ object CoreUtils {
209209

210210
val endPoints = try {
211211
SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap.asJava).
212-
asScala.map(EndPoint.fromJava(_))
212+
asScala.map(EndPoint.fromPublic(_))
213213
} catch {
214214
case e: Exception =>
215215
throw new IllegalArgumentException(s"Error creating broker listeners from '$listeners': ${e.getMessage}", e)

core/src/test/scala/unit/kafka/network/SocketServerTest.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kafka.network
1919

2020
import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode, TextNode}
2121
import com.yammer.metrics.core.{Gauge, Meter}
22-
import kafka.cluster.EndPoint
2322
import kafka.server._
2423
import kafka.utils.Implicits._
2524
import kafka.utils.TestUtils
@@ -37,6 +36,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
3736
import org.apache.kafka.common.utils._
3837
import org.apache.kafka.network.RequestConvertToJson
3938
import org.apache.kafka.network.SocketServerConfigs
39+
import org.apache.kafka.network.EndPoint
4040
import org.apache.kafka.security.CredentialProvider
4141
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
4242
import org.apache.kafka.server.config.QuotaConfig
@@ -1857,7 +1857,7 @@ class SocketServerTest {
18571857
val failedFuture = new CompletableFuture[Void]()
18581858
failedFuture.completeExceptionally(new RuntimeException("authorizer startup failed"))
18591859
assertThrows(classOf[ExecutionException], () => {
1860-
newServer.enableRequestProcessing(Map(endpoint.toJava -> failedFuture)).get()
1860+
newServer.enableRequestProcessing(Map(endpoint.toPublic -> failedFuture)).get()
18611861
})
18621862
} finally {
18631863
shutdownServerAndMetrics(newServer)
@@ -1890,7 +1890,7 @@ class SocketServerTest {
18901890
val authorizerFuture = new CompletableFuture[Void]()
18911891
val enableFuture = newServer.enableRequestProcessing(
18921892
newServer.dataPlaneAcceptors.keys().asScala.
1893-
map(_.toJava).map(k => k -> authorizerFuture).toMap)
1893+
map(_.toPublic).map(k => k -> authorizerFuture).toMap)
18941894
assertFalse(authorizerFuture.isDone)
18951895
assertFalse(enableFuture.isDone)
18961896
newServer.dataPlaneAcceptors.values().forEach(a => assertNull(a.serverChannel))

core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class ControllerRegistrationManagerTest {
7676
"controller-registration-manager-test-",
7777
createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
7878
RecordTestUtils.createTestControllerRegistration(1, false).incarnationId(),
79-
ListenerInfo.create(context.config.controllerListeners.map(_.toJava).asJava),
79+
ListenerInfo.create(context.config.controllerListeners.map(_.toPublic).asJava),
8080
new ExponentialBackoff(1, 2, 100, 0.02))
8181
}
8282

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

+12-12
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package kafka.server
2020
import java.net.InetSocketAddress
2121
import java.util
2222
import java.util.{Arrays, Collections, Properties}
23-
import kafka.cluster.EndPoint
2423
import kafka.utils.TestUtils.assertBadConfigContainingMessage
2524
import kafka.utils.{CoreUtils, TestUtils}
2625
import org.apache.kafka.common.Node
@@ -36,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
3635
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
3736
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
3837
import org.apache.kafka.network.SocketServerConfigs
38+
import org.apache.kafka.network.EndPoint
3939
import org.apache.kafka.raft.QuorumConfig
4040
import org.apache.kafka.server.common.MetadataVersion
4141
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
@@ -344,7 +344,7 @@ class KafkaConfigTest {
344344

345345
val config = KafkaConfig.fromProps(props)
346346
assertEquals(
347-
Seq(EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
347+
Seq(new EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
348348
config.effectiveAdvertisedControllerListeners
349349
)
350350
}
@@ -360,7 +360,7 @@ class KafkaConfigTest {
360360

361361
val config = KafkaConfig.fromProps(props)
362362
assertEquals(
363-
Seq(EndPoint("localhost", 9093, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
363+
Seq(new EndPoint("localhost", 9093, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
364364
config.effectiveAdvertisedControllerListeners
365365
)
366366
}
@@ -378,8 +378,8 @@ class KafkaConfigTest {
378378
val config = KafkaConfig.fromProps(props)
379379
assertEquals(
380380
Seq(
381-
EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT),
382-
EndPoint("localhost", 9094, ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT)
381+
new EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT),
382+
new EndPoint("localhost", 9094, ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT)
383383
),
384384
config.effectiveAdvertisedControllerListeners
385385
)
@@ -508,9 +508,9 @@ class KafkaConfigTest {
508508
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION")
509509
val config = KafkaConfig.fromProps(props)
510510
val expectedListeners = Seq(
511-
EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL),
512-
EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL),
513-
EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT))
511+
new EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL),
512+
new EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL),
513+
new EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT))
514514
assertEquals(expectedListeners, config.listeners)
515515
assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners)
516516
val expectedSecurityProtocolMap = Map(
@@ -537,14 +537,14 @@ class KafkaConfigTest {
537537
val config = KafkaConfig.fromProps(props)
538538

539539
val expectedListeners = Seq(
540-
EndPoint("localhost", 9091, new ListenerName("EXTERNAL"), SecurityProtocol.SSL),
541-
EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)
540+
new EndPoint("localhost", 9091, new ListenerName("EXTERNAL"), SecurityProtocol.SSL),
541+
new EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)
542542
)
543543
assertEquals(expectedListeners, config.listeners)
544544

545545
val expectedAdvertisedListeners = Seq(
546-
EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL),
547-
EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)
546+
new EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL),
547+
new EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)
548548
)
549549
assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedBrokerListeners)
550550

0 commit comments

Comments
 (0)