Skip to content

Commit 9b97c51

Browse files
committed
[hotfix] Fix FlinkKafkaConsumerBaseTest.testClosePartitionDiscovererWithCancellation
Make a copy of AbstractPartitionDiscoverer#getAllTopics before modifying it.
1 parent 2ee9b9a commit 9b97c51

File tree

1 file changed

+10
-19
lines changed

1 file changed

+10
-19
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java

+10-19
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import org.apache.flink.annotation.Internal;
2121

22+
import java.util.ArrayList;
2223
import java.util.HashSet;
23-
import java.util.Iterator;
2424
import java.util.List;
2525
import java.util.Set;
2626

@@ -130,21 +130,18 @@ public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, Cl
130130
// topics or a topic pattern
131131
if (topicsDescriptor.isFixedTopics()) {
132132
newDiscoveredPartitions =
133-
getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
133+
new ArrayList<>(
134+
getAllPartitionsForTopics(topicsDescriptor.getFixedTopics()));
134135
} else {
135-
List<String> matchedTopics = getAllTopics();
136+
List<String> matchedTopics = new ArrayList<>(getAllTopics());
136137

137138
// retain topics that match the pattern
138-
Iterator<String> iter = matchedTopics.iterator();
139-
while (iter.hasNext()) {
140-
if (!topicsDescriptor.isMatchingTopic(iter.next())) {
141-
iter.remove();
142-
}
143-
}
139+
matchedTopics.removeIf(s -> !topicsDescriptor.isMatchingTopic(s));
144140

145-
if (matchedTopics.size() != 0) {
141+
if (!matchedTopics.isEmpty()) {
146142
// get partitions only for matched topics
147-
newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
143+
newDiscoveredPartitions =
144+
new ArrayList<>(getAllPartitionsForTopics(matchedTopics));
148145
} else {
149146
newDiscoveredPartitions = null;
150147
}
@@ -157,14 +154,8 @@ public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, Cl
157154
"Unable to retrieve any partitions with KafkaTopicsDescriptor: "
158155
+ topicsDescriptor);
159156
} else {
160-
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
161-
KafkaTopicPartition nextPartition;
162-
while (iter.hasNext()) {
163-
nextPartition = iter.next();
164-
if (!setAndCheckDiscoveredPartition(nextPartition)) {
165-
iter.remove();
166-
}
167-
}
157+
newDiscoveredPartitions.removeIf(
158+
nextPartition -> !setAndCheckDiscoveredPartition(nextPartition));
168159
}
169160

170161
return newDiscoveredPartitions;

0 commit comments

Comments
 (0)