Skip to content

Commit 6501a1e

Browse files
committed
[FLINK-36278] Unify container creation in Kafka tests
Using KafkaUtil will guarantee unified log settings.
1 parent ebf052b commit 6501a1e

File tree

10 files changed

+39
-50
lines changed

10 files changed

+39
-50
lines changed

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.time.Deadline;
2222
import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
23+
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
2324
import org.apache.flink.connector.testframe.container.FlinkContainers;
2425
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
2526
import org.apache.flink.test.resources.ResourceTestUtils;
@@ -44,7 +45,6 @@
4445
import org.slf4j.LoggerFactory;
4546
import org.testcontainers.containers.KafkaContainer;
4647
import org.testcontainers.containers.Network;
47-
import org.testcontainers.containers.output.Slf4jLogConsumer;
4848
import org.testcontainers.utility.DockerImageName;
4949

5050
import java.nio.file.Path;
@@ -61,7 +61,6 @@
6161
/** End-to-end test for SQL client using Avro Confluent Registry format. */
6262
public class SQLClientSchemaRegistryITCase {
6363
private static final Logger LOG = LoggerFactory.getLogger(SQLClientSchemaRegistryITCase.class);
64-
private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
6564

6665
public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
6766
public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry";
@@ -78,10 +77,9 @@ public class SQLClientSchemaRegistryITCase {
7877

7978
@ClassRule
8079
public static final KafkaContainer KAFKA =
81-
new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
80+
KafkaUtil.createKafkaContainer()
8281
.withNetwork(NETWORK)
83-
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
84-
.withLogConsumer(LOG_CONSUMER);
82+
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
8583

8684
@ClassRule
8785
public static final SchemaRegistryContainer REGISTRY =

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import java.util.UUID;
6363
import java.util.stream.Collectors;
6464

65-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
6665
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
6766
import static org.assertj.core.api.Assertions.assertThat;
6867

@@ -78,7 +77,7 @@ class SmokeKafkaITCase {
7877

7978
@Container
8079
public static final KafkaContainer KAFKA_CONTAINER =
81-
createKafkaContainer(KAFKA)
80+
createKafkaContainer()
8281
.withEmbeddedZookeeper()
8382
.withNetwork(NETWORK)
8483
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import java.util.function.Consumer;
4848
import java.util.stream.Collectors;
4949

50-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
5150
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
5251
import static org.assertj.core.api.Assertions.assertThat;
5352
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -61,7 +60,7 @@ class FlinkKafkaInternalProducerITCase {
6160

6261
@Container
6362
private static final KafkaContainer KAFKA_CONTAINER =
64-
createKafkaContainer(KAFKA).withEmbeddedZookeeper();
63+
createKafkaContainer().withEmbeddedZookeeper();
6564

6665
@Test
6766
void testInitTransactionId() {

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import java.util.stream.Collectors;
102102
import java.util.stream.LongStream;
103103

104-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
105104
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
106105
import static org.assertj.core.api.Assertions.assertThat;
107106
import static org.assertj.core.api.Assertions.fail;
@@ -124,7 +123,7 @@ public class KafkaSinkITCase extends TestLogger {
124123

125124
@ClassRule
126125
public static final KafkaContainer KAFKA_CONTAINER =
127-
createKafkaContainer(KAFKA)
126+
createKafkaContainer()
128127
.withEmbeddedZookeeper()
129128
.withNetwork(NETWORK)
130129
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.Ongoing;
4545
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareAbort;
4646
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareCommit;
47-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
4847
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
4948
import static org.assertj.core.api.Assertions.assertThat;
5049

@@ -57,7 +56,7 @@ public class KafkaTransactionLogITCase extends TestLogger {
5756

5857
@ClassRule
5958
public static final KafkaContainer KAFKA_CONTAINER =
60-
createKafkaContainer(KAFKA).withEmbeddedZookeeper();
59+
createKafkaContainer().withEmbeddedZookeeper();
6160

6261
private final List<Producer<byte[], Integer>> openProducers = new ArrayList<>();
6362

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.util.concurrent.ScheduledFuture;
5656
import java.util.function.Consumer;
5757

58-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
5958
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
6059

6160
/** Test base for KafkaWriter. */
@@ -73,7 +72,7 @@ public abstract class KafkaWriterTestBase {
7372
protected TriggerTimeService timeService;
7473

7574
protected static final KafkaContainer KAFKA_CONTAINER =
76-
createKafkaContainer(KAFKA)
75+
createKafkaContainer()
7776
.withEmbeddedZookeeper()
7877
.withNetwork(NETWORK)
7978
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,38 @@ private KafkaUtil() {}
5151
* This method helps to set commonly used Kafka configurations and aligns the internal Kafka log
5252
* levels with the ones used by the capturing logger.
5353
*
54-
* @param dockerImageVersion describing the Kafka image
5554
* @return configured Kafka container
5655
*/
57-
public static KafkaContainer createKafkaContainer(String dockerImageVersion) {
58-
return createKafkaContainer(dockerImageVersion, "Kafka");
56+
public static KafkaContainer createKafkaContainer() {
57+
return createKafkaContainer("Kafka");
5958
}
6059

6160
/**
6261
* This method helps to set commonly used Kafka configurations and aligns the internal Kafka log
6362
* levels with the ones used by the capturing logger, and set the prefix of logger.
6463
*/
65-
public static KafkaContainer createKafkaContainer(
66-
String dockerImageVersion, String containerName) {
64+
public static KafkaContainer createKafkaContainer(String containerName) {
65+
Logger logger = createLogger(containerName);
6766

68-
Logger logger = LoggerFactory.getLogger("container." + containerName);
67+
String logLevel = inferLogLevel(logger);
6968

69+
Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger, true);
70+
return new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
71+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
72+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
73+
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
74+
.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
75+
.withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", logLevel)
76+
.withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + logLevel)
77+
.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
78+
.withEnv(
79+
"KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
80+
String.valueOf(Duration.ofHours(2).toMillis()))
81+
.withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", logLevel)
82+
.withLogConsumer(logConsumer);
83+
}
84+
85+
public static String inferLogLevel(Logger logger) {
7086
String logLevel;
7187
if (logger.isTraceEnabled()) {
7288
logLevel = "TRACE";
@@ -81,21 +97,11 @@ public static KafkaContainer createKafkaContainer(
8197
} else {
8298
logLevel = "OFF";
8399
}
100+
return logLevel;
101+
}
84102

85-
Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger, true);
86-
return new KafkaContainer(DockerImageName.parse(dockerImageVersion))
87-
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
88-
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
89-
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
90-
.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
91-
.withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", logLevel)
92-
.withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + logLevel)
93-
.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
94-
.withEnv(
95-
"KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
96-
String.valueOf(Duration.ofHours(2).toMillis()))
97-
.withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", logLevel)
98-
.withLogConsumer(logConsumer);
103+
public static Logger createLogger(String containerName) {
104+
return LoggerFactory.getLogger("container." + containerName);
99105
}
100106

101107
/**

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ private KafkaContainer createKafkaContainer(
434434
int brokerID, @Nullable GenericContainer<?> zookeeper) {
435435
String brokerName = String.format("Kafka-%d", brokerID);
436436
KafkaContainer broker =
437-
KafkaUtil.createKafkaContainer(DockerImageVersions.KAFKA, brokerName)
437+
KafkaUtil.createKafkaContainer(brokerName)
438438
.withNetworkAliases(brokerName)
439439
.withEnv("KAFKA_BROKER_ID", String.valueOf(brokerID))
440440
.withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(50 * 1024 * 1024))

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.function.Function;
4343
import java.util.stream.Stream;
4444

45-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
4645
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
4746

4847
@Testcontainers
@@ -55,7 +54,7 @@ class KafkaMetricMutableWrapperTest {
5554

5655
@Container
5756
public static final KafkaContainer KAFKA_CONTAINER =
58-
createKafkaContainer(KAFKA)
57+
createKafkaContainer()
5958
.withEmbeddedZookeeper()
6059
.withNetwork(NETWORK)
6160
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.flink.streaming.connectors.kafka.table;
2020

2121
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
22-
import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
22+
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
2323
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2424
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
2525
import org.apache.flink.test.util.AbstractTestBase;
@@ -41,8 +41,6 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343
import org.testcontainers.containers.KafkaContainer;
44-
import org.testcontainers.containers.output.Slf4jLogConsumer;
45-
import org.testcontainers.utility.DockerImageName;
4644

4745
import java.time.Duration;
4846
import java.util.ArrayList;
@@ -65,15 +63,8 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
6563

6664
@ClassRule
6765
public static final KafkaContainer KAFKA_CONTAINER =
68-
new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) {
69-
@Override
70-
protected void doStart() {
71-
super.doStart();
72-
if (LOG.isInfoEnabled()) {
73-
this.followOutput(new Slf4jLogConsumer(LOG));
74-
}
75-
}
76-
}.withEmbeddedZookeeper()
66+
KafkaUtil.createKafkaContainer()
67+
.withEmbeddedZookeeper()
7768
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
7869
.withEnv(
7970
"KAFKA_TRANSACTION_MAX_TIMEOUT_MS",

0 commit comments

Comments
 (0)