Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36278] Reduce log size #119

Merged
merged 6 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.test.resources.ResourceTestUtils;
Expand All @@ -40,11 +41,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

import java.nio.file.Path;
Expand All @@ -60,9 +58,6 @@

/** End-to-end test for SQL client using Avro Confluent Registry format. */
public class SQLClientSchemaRegistryITCase {
private static final Logger LOG = LoggerFactory.getLogger(SQLClientSchemaRegistryITCase.class);
private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);

public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry";
private static final Path sqlAvroJar = ResourceTestUtils.getResource(".*avro.jar");
Expand All @@ -78,10 +73,9 @@ public class SQLClientSchemaRegistryITCase {

@ClassRule
public static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
KafkaUtil.createKafkaContainer(SQLClientSchemaRegistryITCase.class)
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
.withLogConsumer(LOG_CONSUMER);
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);

@ClassRule
public static final SchemaRegistryContainer REGISTRY =
Expand All @@ -92,7 +86,11 @@ public class SQLClientSchemaRegistryITCase {
.dependsOn(KAFKA);

public final TestcontainersSettings testcontainersSettings =
TestcontainersSettings.builder().network(NETWORK).logger(LOG).dependsOn(KAFKA).build();
TestcontainersSettings.builder()
.network(NETWORK)
.logger(KafkaUtil.getLogger("flink", SQLClientSchemaRegistryITCase.class))
.dependsOn(KAFKA)
.build();

public final FlinkContainers flink =
FlinkContainers.builder().withTestcontainersSettings(testcontainersSettings).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
Expand All @@ -62,7 +60,6 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -71,20 +68,22 @@
@Testcontainers
class SmokeKafkaITCase {

private static final Logger LOG = LoggerFactory.getLogger(SmokeKafkaITCase.class);
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final String EXAMPLE_JAR_MATCHER = "flink-streaming-kafka-test.*";

@Container
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG)
createKafkaContainer(SmokeKafkaITCase.class)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);

public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
TestcontainersSettings.builder().logger(LOG).dependsOn(KAFKA_CONTAINER).build();
TestcontainersSettings.builder()
.logger(KafkaUtil.getLogger("flink", SmokeKafkaITCase.class))
.dependsOn(KAFKA_CONTAINER)
.build();

@RegisterExtension
public static final FlinkContainers FLINK =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,27 @@ appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
#logger.yarn.name = org.testcontainers.shaded.com.github.dockerjava.core
#logger.yarn.level = WARN
#logger.yarn.appenderRef.console.ref = TestLogger

# Logger configuration for containers, by default this is off
# If you want to investigate test failures, overwrite the level as above
logger.container.name = container
logger.container.level = OFF
logger.container.additivity = false # This prevents messages from being logged by the root logger
logger.container.appenderRef.containerappender.ref = ContainerLogger

logger.kafkacontainer.name = container.kafka
logger.kafkacontainer.level = OFF

logger.flinkcontainer.name = container.flink
logger.flinkcontainer.level = OFF

logger.flinkenv.name = org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment
logger.flinkenv.level = OFF
logger.flinkenv.additivity = false # This prevents messages from being logged by the root logger
logger.flinkenv.appenderRef.containerappender.ref = ContainerLogger

appender.containerappender.name = ContainerLogger
appender.containerappender.type = CONSOLE
appender.containerappender.target = SYSTEM_ERR
appender.containerappender.layout.type = PatternLayout
appender.containerappender.layout.pattern = [%c{1}] %m%n
26 changes: 0 additions & 26 deletions flink-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,32 +133,6 @@ under the License.
<type>test-jar</type>
</dependency>


<dependency>
<!-- include kafka server for tests -->
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
}

if (restartingReaders.get()) {
logger.info("Poll next invoked while restarting readers");
logger.debug("Poll next invoked while restarting readers");
return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.flink.annotation.Internal;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -130,21 +130,18 @@ public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, Cl
// topics or a topic pattern
if (topicsDescriptor.isFixedTopics()) {
newDiscoveredPartitions =
getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
new ArrayList<>(
getAllPartitionsForTopics(topicsDescriptor.getFixedTopics()));
} else {
List<String> matchedTopics = getAllTopics();
List<String> matchedTopics = new ArrayList<>(getAllTopics());

// retain topics that match the pattern
Iterator<String> iter = matchedTopics.iterator();
while (iter.hasNext()) {
if (!topicsDescriptor.isMatchingTopic(iter.next())) {
iter.remove();
}
}
matchedTopics.removeIf(s -> !topicsDescriptor.isMatchingTopic(s));

if (matchedTopics.size() != 0) {
if (!matchedTopics.isEmpty()) {
// get partitions only for matched topics
newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
newDiscoveredPartitions =
new ArrayList<>(getAllPartitionsForTopics(matchedTopics));
} else {
newDiscoveredPartitions = null;
}
Expand All @@ -157,14 +154,8 @@ public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, Cl
"Unable to retrieve any partitions with KafkaTopicsDescriptor: "
+ topicsDescriptor);
} else {
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while (iter.hasNext()) {
nextPartition = iter.next();
if (!setAndCheckDiscoveredPartition(nextPartition)) {
iter.remove();
}
}
newDiscoveredPartitions.removeIf(
nextPartition -> !setAndCheckDiscoveredPartition(nextPartition));
}

return newDiscoveredPartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
Expand All @@ -47,7 +45,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -56,12 +53,9 @@
@ExtendWith(TestLoggerExtension.class)
class FlinkKafkaInternalProducerITCase {

private static final Logger LOG =
LoggerFactory.getLogger(FlinkKafkaInternalProducerITCase.class);

@Container
private static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();
createKafkaContainer(FlinkKafkaInternalProducerITCase.class).withEmbeddedZookeeper();

@Test
void testInitTransactionId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
Expand All @@ -124,7 +123,7 @@ public class KafkaSinkITCase extends TestLogger {

@ClassRule
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG)
createKafkaContainer(KafkaSinkITCase.class)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.Ongoing;
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareAbort;
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareCommit;
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -57,7 +56,7 @@ public class KafkaTransactionLogITCase extends TestLogger {

@ClassRule
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();
createKafkaContainer(KafkaTransactionLogITCase.class).withEmbeddedZookeeper();

private final List<Producer<byte[], Integer>> openProducers = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;

import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;

/** Test base for KafkaWriter. */
Expand All @@ -73,7 +72,7 @@ public abstract class KafkaWriterTestBase {
protected TriggerTimeService timeService;

protected static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG)
createKafkaContainer(KafkaWriterTestBase.class)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void writeRecords(List<String> records) {
}
}

logger.info("Writing producer records: {}", producerRecords);
logger.debug("Writing producer records: {}", producerRecords);

DynamicKafkaSourceTestHelper.produceToKafka(
clusterPropertiesMap.get(cluster),
Expand Down
Loading
Loading