Skip to content

Commit 79f1dae

Browse files
committed
Addressing Fabian's review
1 parent 8c266e8 commit 79f1dae

File tree

3 files changed

+28
-12
lines changed

3 files changed

+28
-12
lines changed

flink-connector-kafka/src/test/java/org/apache/flink/KafkaAssertjConfiguration.java

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
/** Configuration for AssertJ that shows full stack traces for unmatched exceptions. */
2424
public class KafkaAssertjConfiguration extends Configuration {
2525
public KafkaAssertjConfiguration() {
26+
// in case of an assertion error, show the full stack trace
27+
// for green builds, this is not changing anything
2628
setMaxStackTraceElementsDisplayed(10000);
2729
}
2830
}

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java

+25-11
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import org.apache.flink.configuration.CoreOptions;
2424
import org.apache.flink.core.execution.JobClient;
2525
import org.apache.flink.core.execution.SavepointFormatType;
26-
import org.apache.flink.core.testutils.FlinkAssertions;
2726
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
27+
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
2828
import org.apache.flink.streaming.api.datastream.DataStream;
2929
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3030
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -41,7 +41,9 @@
4141
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
4242
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
4343
import org.apache.kafka.common.TopicPartition;
44+
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
4445
import org.assertj.core.api.Assertions;
46+
import org.assertj.core.api.ThrowingConsumer;
4547
import org.junit.Before;
4648
import org.junit.Test;
4749
import org.junit.runner.RunWith;
@@ -66,6 +68,7 @@
6668
import java.util.stream.IntStream;
6769

6870
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
71+
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
6972
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectAllRows;
7073
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows;
7174
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines;
@@ -1318,7 +1321,7 @@ public void testStartFromGroupOffsetsEarliest() throws Exception {
13181321
@Test
13191322
public void testStartFromGroupOffsetsNone() {
13201323
Assertions.assertThatThrownBy(() -> testStartFromGroupOffsetsWithNoneResetStrategy())
1321-
.satisfies(FlinkAssertions.anyCauseMatches(NoOffsetForPartitionException.class));
1324+
.satisfies(anyCauseMatches(NoOffsetForPartitionException.class));
13221325
}
13231326

13241327
private List<String> appendNewData(
@@ -1513,20 +1516,31 @@ private static boolean isCausedByJobFinished(Throwable e) {
15131516
}
15141517

15151518
private void cleanupTopic(String topic) {
1516-
ignoreExceptions(() -> deleteTestTopic(topic));
1519+
ignoreExceptions(
1520+
() -> deleteTestTopic(topic),
1521+
anyCauseMatches(UnknownTopicOrPartitionException.class));
15171522
}
15181523

1519-
private static void ignoreExceptions(RunnableWithException e) {
1520-
try {
1521-
e.run();
1522-
} catch (Exception ex) {
1523-
// ignore
1524+
private static void cancelJob(TableResult tableResult) {
1525+
if (tableResult != null && tableResult.getJobClient().isPresent()) {
1526+
ignoreExceptions(
1527+
() -> tableResult.getJobClient().get().cancel().get(),
1528+
anyCauseMatches(FlinkJobTerminatedWithoutCancellationException.class),
1529+
anyCauseMatches(
1530+
"MiniCluster is not yet running or has already been shut down."));
15241531
}
15251532
}
15261533

1527-
private static void cancelJob(TableResult tableResult) {
1528-
if (tableResult != null) {
1529-
ignoreExceptions(() -> tableResult.getJobClient().ifPresent(JobClient::cancel));
1534+
@SafeVarargs
1535+
private static void ignoreExceptions(
1536+
RunnableWithException runnable, ThrowingConsumer<? super Throwable>... ignoreIf) {
1537+
try {
1538+
runnable.run();
1539+
} catch (InterruptedException e) {
1540+
Thread.currentThread().interrupt();
1541+
} catch (Exception ex) {
1542+
// check if the exception is one of the ignored ones
1543+
assertThat(ex).satisfiesAnyOf(ignoreIf);
15301544
}
15311545
}
15321546
}

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
* A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link
3636
* TypeSerializerSchemaCompatibility}.
3737
*
38-
* <p>Note copied from Flink 1.18. Remove when we drop 1.18 support.
38+
* <p>Note copied from Flink 1.19. Remove when we drop 1.19 support.
3939
*/
4040
public final class TypeSerializerMatchers {
4141

0 commit comments

Comments
 (0)