diff --git a/testing/build.gradle b/testing/build.gradle index 7a23908a..fe0c4d71 100644 --- a/testing/build.gradle +++ b/testing/build.gradle @@ -14,24 +14,19 @@ * under the License. */ -ext { - noPublish = true -} dependencies { - implementation project(':processor') - implementation project(':protobuf') + compile project(':processor') + compile project(':protobuf') - implementation "org.apache.kafka:kafka-clients:$kafkaVersion" - implementation "org.apache.kafka:kafka_2.12:$kafkaVersion" - implementation "org.apache.kafka:kafka-clients:$kafkaVersion:test" - implementation "org.apache.kafka:kafka_2.12:$kafkaVersion:test" + compile "org.apache.kafka:kafka-clients:$kafkaVersion" + compile "org.apache.kafka:kafka_2.12:$kafkaVersion" - implementation('org.apache.zookeeper:zookeeper:3.5.6') { + compile('org.apache.zookeeper:zookeeper:3.5.6') { exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'log4j', module: 'log4j' } - implementation "junit:junit:$junitVersion" - implementation "org.hamcrest:hamcrest-all:$hamcrestVersion" + compile "junit:junit:$junitVersion" + compile "org.hamcrest:hamcrest-all:$hamcrestVersion" runtimeOnly "ch.qos.logback:logback-classic:1.2.3" } diff --git a/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java b/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java index a68625ed..5f2d9f71 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/EmbeddedKafkaCluster.java @@ -16,17 +16,20 @@ package com.linecorp.decaton.testing; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Time; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.CoreUtils; -import kafka.utils.TestUtils; import lombok.Getter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -48,8 +51,13 @@ public EmbeddedKafkaCluster(int numBrokers, String zkConnect) { for (int i = 0; i < numBrokers; i++) { Properties prop = createBrokerConfig(i, zkConnect); - KafkaServer server = TestUtils.createServer(KafkaConfig.fromProps(prop), Time.SYSTEM); - int port = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT); + + KafkaServer server = new KafkaServer(KafkaConfig.fromProps(prop), + Time.SYSTEM, + Option.empty(), + scala.collection.immutable.List.empty()); + server.startup(); + int port = server.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)); String listener = "127.0.0.1:" + port; listeners.add(listener); servers.add(server); @@ -61,24 +69,32 @@ public EmbeddedKafkaCluster(int numBrokers, String zkConnect) { } private static Properties createBrokerConfig(int brokerId, String zkConnect) { - return TestUtils.createBrokerConfig(brokerId, zkConnect, - false, // disable controlled shutdown - true, // enable delete topic - 0, // use random port + Properties prop = new Properties(); - // << enable only PLAINTEXT - Option.empty(), - Option.empty(), Option.empty(), - true, false, 0, - false, 0, false, 0, - // enable only PLAINTEXT >> + prop.setProperty("broker.id", String.valueOf(brokerId)); + prop.setProperty("zookeeper.connect", zkConnect); + prop.setProperty("controlled.shutdown.enable", "false"); + prop.setProperty("delete.topic.enable", "true"); + prop.setProperty("listeners", "PLAINTEXT://localhost:0"); + try { + prop.setProperty("log.dir", + Files.createTempDirectory("zookeeper-logs").toFile().getAbsolutePath()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + prop.setProperty("num.partitions", "1"); + prop.setProperty("default.replication.factor", "3"); + prop.setProperty("zookeeper.connection.timeout.ms", "10000"); + prop.setProperty("replica.socket.timeout.ms", "1500"); + prop.setProperty("controller.socket.timeout.ms", "1500"); + prop.setProperty("log.segment.delete.delay.ms", "1000"); + prop.setProperty("log.cleaner.dedupe.buffer.size", "2097152"); + prop.setProperty("message.timestamp.difference.max.ms", String.valueOf(Long.MAX_VALUE)); + prop.setProperty("offsets.topic.replication.factor", "1"); + prop.setProperty("offsets.topic.num.partitions", "5"); + prop.setProperty("group.initial.rebalance.delay.ms", "0"); - Option.empty(), // omit rack information - 1, // logDir count - false, // disable delegation token - 1, // num partitions - (short) 1 // default replication factor - ); + return prop; } @Override