Skip to content

Commit ab8d968

Browse files
committed
Addressing Fabian's review
1 parent 8c266e8 commit ab8d968

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

flink-connector-kafka/src/test/java/org/apache/flink/KafkaAssertjConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
/** Configuration for AssertJ that shows full stack traces for unmatched exceptions. */
2424
public class KafkaAssertjConfiguration extends Configuration {
2525
public KafkaAssertjConfiguration() {
26+
// in case of an assertion error, show the full stack trace
27+
// for green builds, this is not changing anything
2628
setMaxStackTraceElementsDisplayed(10000);
2729
}
2830
}

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.configuration.CoreOptions;
2424
import org.apache.flink.core.execution.JobClient;
2525
import org.apache.flink.core.execution.SavepointFormatType;
26-
import org.apache.flink.core.testutils.FlinkAssertions;
2726
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
2827
import org.apache.flink.streaming.api.datastream.DataStream;
2928
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,11 +35,13 @@
3635
import org.apache.flink.table.utils.EncodingUtils;
3736
import org.apache.flink.test.util.SuccessException;
3837
import org.apache.flink.types.Row;
38+
import org.apache.flink.util.FlinkException;
3939
import org.apache.flink.util.function.RunnableWithException;
4040

4141
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
4242
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
4343
import org.apache.kafka.common.TopicPartition;
44+
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
4445
import org.assertj.core.api.Assertions;
4546
import org.junit.Before;
4647
import org.junit.Test;
@@ -66,6 +67,8 @@
6667
import java.util.stream.IntStream;
6768

6869
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
70+
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
71+
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
6972
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectAllRows;
7073
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows;
7174
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines;
@@ -1318,7 +1321,7 @@ public void testStartFromGroupOffsetsEarliest() throws Exception {
13181321
@Test
13191322
public void testStartFromGroupOffsetsNone() {
13201323
Assertions.assertThatThrownBy(() -> testStartFromGroupOffsetsWithNoneResetStrategy())
1321-
.satisfies(FlinkAssertions.anyCauseMatches(NoOffsetForPartitionException.class));
1324+
.satisfies(anyCauseMatches(NoOffsetForPartitionException.class));
13221325
}
13231326

13241327
private List<String> appendNewData(
@@ -1513,20 +1516,28 @@ private static boolean isCausedByJobFinished(Throwable e) {
15131516
}
15141517

15151518
private void cleanupTopic(String topic) {
1516-
ignoreExceptions(() -> deleteTestTopic(topic));
1519+
ignoreExceptions(() -> deleteTestTopic(topic), UnknownTopicOrPartitionException.class);
15171520
}
15181521

1519-
private static void ignoreExceptions(RunnableWithException e) {
1522+
@SafeVarargs
1523+
private static void ignoreExceptions(
1524+
RunnableWithException runnable, Class<? extends Exception>... exClasses) {
15201525
try {
1521-
e.run();
1526+
runnable.run();
1527+
} catch (InterruptedException e) {
1528+
Thread.currentThread().interrupt();
15221529
} catch (Exception ex) {
1523-
// ignore
1530+
// check if the exception is one of the ignored ones
1531+
assertThatChainOfCauses(ex)
1532+
.anyMatch(
1533+
cause -> Arrays.stream(exClasses).anyMatch(cl -> cl.isInstance(cause)));
15241534
}
15251535
}
15261536

15271537
private static void cancelJob(TableResult tableResult) {
1528-
if (tableResult != null) {
1529-
ignoreExceptions(() -> tableResult.getJobClient().ifPresent(JobClient::cancel));
1538+
if (tableResult != null && tableResult.getJobClient().isPresent()) {
1539+
ignoreExceptions(
1540+
() -> tableResult.getJobClient().get().cancel().get(), FlinkException.class);
15301541
}
15311542
}
15321543
}

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerMatchers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
* A Collection of useful {@link Matcher}s for {@link TypeSerializer} and {@link
3636
* TypeSerializerSchemaCompatibility}.
3737
*
38-
* <p>Note copied from Flink 1.18. Remove when we drop 1.18 support.
38+
* <p>Note copied from Flink 1.19. Remove when we drop 1.19 support.
3939
*/
4040
public final class TypeSerializerMatchers {
4141

0 commit comments

Comments
 (0)