Skip to content

Commit ea3a641

Browse files
qbx2AHeise
authored andcommitted
[FLINK-33201][Connectors/Kafka] Fix memory leak in CachingTopicSelector
1 parent b5b8076 commit ea3a641

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ private static class CachingTopicSelector<IN> implements Function<IN, String>, S
284284
public String apply(IN in) {
285285
final String topic = cache.getOrDefault(in, topicSelector.apply(in));
286286
cache.put(in, topic);
287-
if (cache.size() == CACHE_RESET_SIZE) {
287+
if (cache.size() >= CACHE_RESET_SIZE) {
288288
cache.clear();
289289
}
290290
return topic;

0 commit comments

Comments
 (0)