From c4d6662fed38fd3e0278b2a1f2595e6741e01294 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Thu, 29 Aug 2024 10:32:25 +0200 Subject: [PATCH] [FLINK-35886] Leave a note for future Flink 2.0 upgrade Fixing FLIP-471 in the old source would require some extra work, that we hopefully can avoid by removing the old source before doing the upgrade. --- .../connectors/kafka/internals/AbstractFetcher.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 841d45288..41b5ad24c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -435,6 +435,14 @@ private List> createPartitionStateHolders( kafkaHandle, deserializedWatermarkStrategy.createTimestampAssigner( () -> consumerMetricGroup), + // When upgrading to Flink 2.0, context has to provide also + // the input activity clock. This is not trivial for the old + // sources. Ideally we should drop this old source before + // this connector is upgraded to Flink 2.0. Otherwise, we + // can avoid the compilation error without fixing the bug + // addressed by the FLIP-471, by returning SystemClock, + // which would reproduce the pre-FLIP-471 behavior (without + // fixing the underlying bug). deserializedWatermarkStrategy.createWatermarkGenerator( () -> consumerMetricGroup), immediateOutput,