diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java index 78a4b0b60..0709afe0b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java @@ -474,10 +474,9 @@ private void parseAndSetRequiredProperties() { true); // If the source is bounded, do not run periodic partition discovery. - maybeOverride( - KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), - "-1", - boundedness == Boundedness.BOUNDED); + if (boundedness == Boundedness.BOUNDED) { + maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", true); + } // If the client id prefix is not set, reuse the consumer group id as the client id prefix, // or generate a random string if consumer group id is not specified. diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java index 2829f01e0..ca777bc73 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java @@ -217,6 +217,29 @@ public void testSettingInvalidCustomDeserializers( .hasMessageContaining(expectedError); } + @Test + public void testDefaultPartitionDiscovery() { + final KafkaSource kafkaSource = getBasicBuilder().build(); + // Commit on checkpoint and auto commit should be disabled because group.id is not specified + assertThat( + kafkaSource + .getConfiguration() + .get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS)) + .isEqualTo(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.defaultValue()); + } + + @Test + public void testPeriodPartitionDiscovery() { + final KafkaSource kafkaSource = + getBasicBuilder().setBounded(OffsetsInitializer.latest()).build(); + // Commit on checkpoint and auto commit should be disabled because group.id is not specified + assertThat( + kafkaSource + .getConfiguration() + .get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS)) + .isEqualTo(-1L); + } + private KafkaSourceBuilder getBasicBuilder() { return new KafkaSourceBuilder() .setBootstrapServers("testServer")