Skip to content

Commit e460f50

Browse files
committed
Fixing producer leak
1 parent b909048 commit e460f50

11 files changed

+130
-80
lines changed

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java

+7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.kafka.common.errors.ProducerFencedException;
3131
import org.apache.kafka.common.serialization.StringDeserializer;
3232
import org.apache.kafka.common.serialization.StringSerializer;
33+
import org.junit.jupiter.api.AfterEach;
3334
import org.junit.jupiter.api.Test;
3435
import org.junit.jupiter.api.extension.ExtendWith;
3536
import org.junit.jupiter.params.ParameterizedTest;
@@ -45,6 +46,7 @@
4546
import java.util.function.Consumer;
4647
import java.util.stream.Collectors;
4748

49+
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
4850
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
4951
import static org.assertj.core.api.Assertions.assertThat;
5052
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -57,6 +59,11 @@ class FlinkKafkaInternalProducerITCase {
5759
private static final KafkaContainer KAFKA_CONTAINER =
5860
createKafkaContainer(FlinkKafkaInternalProducerITCase.class).withEmbeddedZookeeper();
5961

62+
@AfterEach
63+
public void check() {
64+
checkProducerLeak();
65+
}
66+
6067
@Test
6168
void testInitTransactionId() {
6269
final String topic = "test-init-transactions";

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.clients.producer.ProducerConfig;
2525
import org.apache.kafka.common.errors.ProducerFencedException;
2626
import org.apache.kafka.common.serialization.StringSerializer;
27+
import org.junit.jupiter.api.AfterEach;
2728
import org.junit.jupiter.api.Test;
2829
import org.junit.jupiter.api.extension.ExtendWith;
2930

@@ -32,16 +33,22 @@
3233
import java.util.Collections;
3334
import java.util.Properties;
3435

36+
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
3537
import static org.assertj.core.api.Assertions.assertThat;
3638

3739
/** Tests for {@link KafkaCommitter}. */
3840
@ExtendWith({TestLoggerExtension.class})
39-
public class KafkaCommitterTest {
41+
class KafkaCommitterTest {
4042

4143
private static final int PRODUCER_ID = 0;
4244
private static final short EPOCH = 0;
4345
private static final String TRANSACTIONAL_ID = "transactionalId";
4446

47+
@AfterEach
48+
public void check() {
49+
checkProducerLeak();
50+
}
51+
4552
/** Causes a network error by inactive broker and tests that a retry will happen. */
4653
@Test
4754
public void testRetryCommittableOnRetriableError() throws IOException, InterruptedException {

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public void testPropertyHandling() {
5252
getBasicBuilder().setProperty("k1", "v1"),
5353
p -> {
5454
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertThat(p).containsKey(k));
55-
p.containsKey("k1");
5655
});
5756

5857
Properties testConf = new Properties();

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java

+2-37
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@
8686
import java.io.File;
8787
import java.io.IOException;
8888
import java.nio.ByteBuffer;
89-
import java.util.Arrays;
9089
import java.util.Collections;
9190
import java.util.HashMap;
9291
import java.util.List;
@@ -101,9 +100,9 @@
101100
import java.util.stream.Collectors;
102101
import java.util.stream.LongStream;
103102

103+
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
104104
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
105105
import static org.assertj.core.api.Assertions.assertThat;
106-
import static org.assertj.core.api.Assertions.fail;
107106

108107
/** Tests for using KafkaSink writing to a Kafka cluster. */
109108
public class KafkaSinkITCase extends TestLogger {
@@ -158,6 +157,7 @@ public void setUp() throws ExecutionException, InterruptedException, TimeoutExce
158157

159158
@After
160159
public void tearDown() throws ExecutionException, InterruptedException, TimeoutException {
160+
checkProducerLeak();
161161
deleteTestTopic(topic);
162162
}
163163

@@ -329,7 +329,6 @@ private void executeWithMapper(
329329
builder.setTransactionalIdPrefix(transactionalIdPrefix);
330330
stream.sinkTo(builder.build());
331331
env.execute();
332-
checkProducerLeak();
333332
}
334333

335334
private void testRecoveryWithAssertion(
@@ -600,40 +599,6 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
600599
public void initializeState(FunctionInitializationContext context) throws Exception {}
601600
}
602601

603-
private void checkProducerLeak() throws InterruptedException {
604-
List<Map.Entry<Thread, StackTraceElement[]>> leaks = null;
605-
for (int tries = 0; tries < 10; tries++) {
606-
leaks =
607-
Thread.getAllStackTraces().entrySet().stream()
608-
.filter(this::findAliveKafkaThread)
609-
.collect(Collectors.toList());
610-
if (leaks.isEmpty()) {
611-
return;
612-
}
613-
Thread.sleep(1000);
614-
}
615-
616-
for (Map.Entry<Thread, StackTraceElement[]> leak : leaks) {
617-
leak.getKey().stop();
618-
}
619-
fail(
620-
"Detected producer leaks:\n"
621-
+ leaks.stream().map(this::format).collect(Collectors.joining("\n\n")));
622-
}
623-
624-
private String format(Map.Entry<Thread, StackTraceElement[]> leak) {
625-
String stackTrace =
626-
Arrays.stream(leak.getValue())
627-
.map(StackTraceElement::toString)
628-
.collect(Collectors.joining("\n"));
629-
return leak.getKey().getName() + ":\n" + stackTrace;
630-
}
631-
632-
private boolean findAliveKafkaThread(Map.Entry<Thread, StackTraceElement[]> threadStackTrace) {
633-
return threadStackTrace.getKey().getState() != Thread.State.TERMINATED
634-
&& threadStackTrace.getKey().getName().contains("kafka-producer-network-thread");
635-
}
636-
637602
/**
638603
* Exposes information about how man records have been emitted overall and finishes after
639604
* receiving the checkpoint completed event.

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.junit.After;
3030
import org.junit.ClassRule;
3131
import org.junit.Test;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
3432
import org.testcontainers.containers.KafkaContainer;
3533

3634
import java.util.ArrayList;
@@ -44,13 +42,13 @@
4442
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.Ongoing;
4543
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareAbort;
4644
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareCommit;
45+
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
4746
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
4847
import static org.assertj.core.api.Assertions.assertThat;
4948

5049
/** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka transactions. */
5150
public class KafkaTransactionLogITCase extends TestLogger {
5251

53-
private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkITCase.class);
5452
private static final String TOPIC_NAME = "kafkaTransactionLogTest";
5553
private static final String TRANSACTIONAL_ID_PREFIX = "kafka-log";
5654

@@ -63,6 +61,7 @@ public class KafkaTransactionLogITCase extends TestLogger {
6361
@After
6462
public void tearDown() {
6563
openProducers.forEach(Producer::close);
64+
checkProducerLeak();
6665
}
6766

6867
@Test

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java

+42-36
Original file line numberDiff line numberDiff line change
@@ -63,20 +63,21 @@ void testWriteExceptionWhenKafkaUnavailable() throws Exception {
6363

6464
final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup();
6565

66-
final KafkaWriter<Integer> writer =
66+
try (KafkaWriter<Integer> writer =
6767
createWriterWithConfiguration(
68-
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup);
68+
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup)) {
6969

70-
writer.write(1, SINK_WRITER_CONTEXT);
70+
writer.write(1, SINK_WRITER_CONTEXT);
7171

72-
KAFKA_CONTAINER.stop();
72+
KAFKA_CONTAINER.stop();
7373

74-
try {
75-
writer.getCurrentProducer().flush();
76-
assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
77-
.hasRootCauseExactlyInstanceOf(NetworkException.class);
78-
} finally {
79-
KAFKA_CONTAINER.start();
74+
try {
75+
writer.getCurrentProducer().flush();
76+
assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
77+
.hasRootCauseExactlyInstanceOf(NetworkException.class);
78+
} finally {
79+
KAFKA_CONTAINER.start();
80+
}
8081
}
8182
}
8283

@@ -86,17 +87,18 @@ void testFlushExceptionWhenKafkaUnavailable() throws Exception {
8687

8788
final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup();
8889

89-
final KafkaWriter<Integer> writer =
90+
try (KafkaWriter<Integer> writer =
9091
createWriterWithConfiguration(
91-
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup);
92-
writer.write(1, SINK_WRITER_CONTEXT);
93-
94-
KAFKA_CONTAINER.stop();
95-
try {
96-
assertThatCode(() -> writer.flush(false))
97-
.hasRootCauseExactlyInstanceOf(NetworkException.class);
98-
} finally {
99-
KAFKA_CONTAINER.start();
92+
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup)) {
93+
writer.write(1, SINK_WRITER_CONTEXT);
94+
95+
KAFKA_CONTAINER.stop();
96+
try {
97+
assertThatCode(() -> writer.flush(false))
98+
.hasRootCauseExactlyInstanceOf(NetworkException.class);
99+
} finally {
100+
KAFKA_CONTAINER.start();
101+
}
100102
}
101103
}
102104

@@ -106,7 +108,7 @@ void testCloseExceptionWhenKafkaUnavailable() throws Exception {
106108

107109
final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup();
108110

109-
final KafkaWriter<Integer> writer =
111+
KafkaWriter<Integer> writer =
110112
createWriterWithConfiguration(
111113
properties, DeliveryGuarantee.AT_LEAST_ONCE, metricGroup);
112114

@@ -119,6 +121,9 @@ void testCloseExceptionWhenKafkaUnavailable() throws Exception {
119121
// closing producer resource throws exception first
120122
assertThatCode(() -> writer.close())
121123
.hasRootCauseExactlyInstanceOf(NetworkException.class);
124+
} catch (Exception e) {
125+
writer.close();
126+
throw e;
122127
} finally {
123128
KAFKA_CONTAINER.start();
124129
}
@@ -130,26 +135,27 @@ void testMailboxExceptionWhenKafkaUnavailable() throws Exception {
130135
SinkInitContext sinkInitContext =
131136
new SinkInitContext(createSinkWriterMetricGroup(), timeService, null);
132137

133-
final KafkaWriter<Integer> writer =
138+
try (KafkaWriter<Integer> writer =
134139
createWriterWithConfiguration(
135-
properties, DeliveryGuarantee.AT_LEAST_ONCE, sinkInitContext);
140+
properties, DeliveryGuarantee.AT_LEAST_ONCE, sinkInitContext)) {
136141

137-
KAFKA_CONTAINER.stop();
142+
KAFKA_CONTAINER.stop();
138143

139-
writer.write(1, SINK_WRITER_CONTEXT);
144+
writer.write(1, SINK_WRITER_CONTEXT);
140145

141-
try {
142-
writer.getCurrentProducer().flush();
146+
try {
147+
writer.getCurrentProducer().flush();
143148

144-
assertThatCode(
145-
() -> {
146-
while (sinkInitContext.getMailboxExecutor().tryYield()) {
147-
// execute all mails
148-
}
149-
})
150-
.hasRootCauseExactlyInstanceOf(TimeoutException.class);
151-
} finally {
152-
KAFKA_CONTAINER.start();
149+
assertThatCode(
150+
() -> {
151+
while (sinkInitContext.getMailboxExecutor().tryYield()) {
152+
// execute all mails
153+
}
154+
})
155+
.hasRootCauseExactlyInstanceOf(TimeoutException.class);
156+
} finally {
157+
KAFKA_CONTAINER.start();
158+
}
153159
}
154160
}
155161

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java

+9
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception {
165165
.as("the exception is not thrown again")
166166
.doesNotThrowAnyException();
167167
assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
168+
169+
// async exception is checked and thrown on close
170+
assertThatCode(writer::close).hasRootCauseInstanceOf(ProducerFencedException.class);
168171
}
169172

170173
@Test
@@ -191,6 +194,9 @@ void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception {
191194
.as("the exception is not thrown again")
192195
.doesNotThrowAnyException();
193196
assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
197+
198+
// async exception is checked and thrown on close
199+
assertThatCode(writer::close).hasRootCauseInstanceOf(ProducerFencedException.class);
194200
}
195201

196202
@Test
@@ -225,6 +231,9 @@ void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception {
225231
.as("the exception is not thrown again")
226232
.doesNotThrowAnyException();
227233
assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
234+
235+
// async exception is checked and thrown on close
236+
assertThatCode(writer::close).hasRootCauseInstanceOf(ProducerFencedException.class);
228237
}
229238

230239
@Test

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java

+7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.kafka.clients.producer.ProducerRecord;
3737
import org.apache.kafka.clients.producer.RecordMetadata;
3838
import org.apache.kafka.common.serialization.ByteArraySerializer;
39+
import org.junit.jupiter.api.AfterEach;
3940
import org.junit.jupiter.api.BeforeEach;
4041
import org.junit.jupiter.api.TestInfo;
4142
import org.slf4j.Logger;
@@ -55,6 +56,7 @@
5556
import java.util.concurrent.ScheduledFuture;
5657
import java.util.function.Consumer;
5758

59+
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
5860
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
5961

6062
/** Test base for KafkaWriter. */
@@ -84,6 +86,11 @@ public void setUp(TestInfo testInfo) {
8486
topic = testInfo.getDisplayName().replaceAll("\\W", "");
8587
}
8688

89+
@AfterEach
90+
public void check() {
91+
checkProducerLeak();
92+
}
93+
8794
protected KafkaWriter<Integer> createWriterWithConfiguration(
8895
Properties config, DeliveryGuarantee guarantee) throws IOException {
8996
return createWriterWithConfiguration(config, guarantee, createSinkWriterMetricGroup());

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceLegacyITCase.java

+8
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
import org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase;
2424
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
2525

26+
import org.junit.After;
2627
import org.junit.BeforeClass;
2728
import org.junit.Ignore;
2829
import org.junit.Test;
2930

31+
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
32+
3033
/**
3134
* An IT case class that runs all the IT cases of the legacy {@link
3235
* org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer} with the new {@link KafkaSource}.
@@ -44,6 +47,11 @@ public static void prepare() throws Exception {
4447
.setProducerSemantic(FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
4548
}
4649

50+
@After
51+
public void check() {
52+
checkProducerLeak();
53+
}
54+
4755
@Test
4856
public void testFailOnNoBroker() throws Exception {
4957
runFailOnNoBrokerTest();

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
*/
2525
public class DockerImageVersions {
2626

27-
public static final String KAFKA = "confluentinc/cp-kafka:7.4.4";
27+
public static final String KAFKA = "confluentinc/cp-kafka:7.7.1";
2828

29-
public static final String SCHEMA_REGISTRY = "confluentinc/cp-schema-registry:7.4.4";
29+
public static final String SCHEMA_REGISTRY = "confluentinc/cp-schema-registry:7.7.1";
3030

3131
public static final String ZOOKEEPER = "zookeeper:3.4.14";
3232
}

0 commit comments

Comments
 (0)