Skip to content

Commit

Permalink
[FLINK-36278] Reduce log size by avoiding container logs by default
Browse files Browse the repository at this point in the history
Currently, container logs appear under an o.a.f logger and thus are visible on CI. This results in compressed log size >40MB for a run and often leads to download errors.

This PR reroutes container logs to a special container logger. It also uses a custom format to significantly reduce the size of each log line. The logs for containers are disabled by default.
  • Loading branch information
AHeise committed Sep 16, 2024
1 parent 7929b16 commit bc5f8b9
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class SmokeKafkaITCase {

@Container
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG)
createKafkaContainer(KAFKA)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class FlinkKafkaInternalProducerITCase {

@Container
private static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();
createKafkaContainer(KAFKA).withEmbeddedZookeeper();

@Test
void testInitTransactionId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class KafkaSinkITCase extends TestLogger {

@ClassRule
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG)
createKafkaContainer(KAFKA)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class KafkaTransactionLogITCase extends TestLogger {

@ClassRule
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();
createKafkaContainer(KAFKA).withEmbeddedZookeeper();

private final List<Producer<byte[], Integer>> openProducers = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public abstract class KafkaWriterTestBase {
protected TriggerTimeService timeService;

protected static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG)
createKafkaContainer(KAFKA)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.connector.kafka.testutils;

import org.apache.flink.util.StringUtils;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -54,19 +52,21 @@ private KafkaUtil() {}
* levels with the ones used by the capturing logger.
*
* @param dockerImageVersion describing the Kafka image
* @param logger to derive the log level from
* @return configured Kafka container
*/
public static KafkaContainer createKafkaContainer(String dockerImageVersion, Logger logger) {
return createKafkaContainer(dockerImageVersion, logger, null);
public static KafkaContainer createKafkaContainer(String dockerImageVersion) {
return createKafkaContainer(dockerImageVersion, "Kafka");
}

/**
* This method helps to set commonly used Kafka configurations and aligns the internal Kafka log
* levels with the ones used by the capturing logger, and set the prefix of logger.
*/
public static KafkaContainer createKafkaContainer(
String dockerImageVersion, Logger logger, String loggerPrefix) {
String dockerImageVersion, String containerName) {

Logger logger = LoggerFactory.getLogger("container." + containerName);

String logLevel;
if (logger.isTraceEnabled()) {
logLevel = "TRACE";
Expand All @@ -82,10 +82,7 @@ public static KafkaContainer createKafkaContainer(
logLevel = "OFF";
}

Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger);
if (!StringUtils.isNullOrWhitespaceOnly(loggerPrefix)) {
logConsumer.withPrefix(loggerPrefix);
}
Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger, true);
return new KafkaContainer(DockerImageName.parse(dockerImageVersion))
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ protected Properties createProperties() {
Properties properties = new Properties();
properties.putAll(standardProps);
properties.putAll(secureProps);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-client-id");
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-transaction-id");
properties.put(FlinkKafkaProducer.KEY_DISABLE_METRICS, "true");
return properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ private KafkaContainer createKafkaContainer(
int brokerID, @Nullable GenericContainer<?> zookeeper) {
String brokerName = String.format("Kafka-%d", brokerID);
KafkaContainer broker =
KafkaUtil.createKafkaContainer(DockerImageVersions.KAFKA, LOG, brokerName)
KafkaUtil.createKafkaContainer(DockerImageVersions.KAFKA, brokerName)
.withNetworkAliases(brokerName)
.withEnv("KAFKA_BROKER_ID", String.valueOf(brokerID))
.withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(50 * 1024 * 1024))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class KafkaMetricMutableWrapperTest {

@Container
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG)
createKafkaContainer(KAFKA)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
Expand Down
14 changes: 12 additions & 2 deletions flink-connector-kafka/src/test/resources/log4j2-test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,15 @@ logger.zookeeper.level = OFF
logger.I0Itec.name = org.I0Itec
logger.I0Itec.level = OFF

logger.splitreader.name = org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader
logger.splitreader.level = DEBUG
# Logger configuration for containers, by default this is off
# If you want to investigate test failures, overwrite the level as above
logger.containerlogger.name = container
logger.containerlogger.level = OFF
logger.containerlogger.additivity = false # This prevents messages from being logged by the root logger
logger.containerlogger.appenderRef.containerappender.ref = ContainerLogger

appender.containerappender.name = ContainerLogger
appender.containerappender.type = CONSOLE
appender.containerappender.target = SYSTEM_ERR
appender.containerappender.layout.type = PatternLayout
appender.containerappender.layout.pattern = [%c{1}] %m%n

0 comments on commit bc5f8b9

Please sign in to comment.