diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 841d45288..41b5ad24c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -435,6 +435,14 @@ private List> createPartitionStateHolders( kafkaHandle, deserializedWatermarkStrategy.createTimestampAssigner( () -> consumerMetricGroup), + // When upgrading to Flink 2.0, context has to provide also + // the input activity clock. This is not trivial for the old + // sources. Ideally we should drop this old source before + // this connector is upgraded to Flink 2.0. Otherwise, we + // can avoid the compilation error without fixing the bug + // addressed by the FLIP-471, by returning SystemClock, + // which would reproduce the pre-FLIP-471 behavior (without + // fixing the underlying bug). deserializedWatermarkStrategy.createWatermarkGenerator( () -> consumerMetricGroup), immediateOutput,