Skip to content

Commit 5b68f06

Browse files
committed
[FLINK-36177] Deprecate KafkaShuffle and more
This commit deprecates all classes that are slated for removal in the kafka-4.0 release compatible with Flink 2.0. I also deprecated internal classes to make later removal easier. Some public classes will cease to be public API but are still internally used.
1 parent 9b97c51 commit 5b68f06

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+79
-34
lines changed

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.streaming.connectors.kafka;
1919

20+
import org.apache.commons.collections.map.LinkedMap;
2021
import org.apache.flink.annotation.Internal;
2122
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.api.common.ExecutionConfig;
@@ -58,8 +59,6 @@
5859
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
5960
import org.apache.flink.util.ExceptionUtils;
6061
import org.apache.flink.util.SerializedValue;
61-
62-
import org.apache.commons.collections.map.LinkedMap;
6362
import org.apache.kafka.clients.consumer.ConsumerConfig;
6463
import org.slf4j.Logger;
6564
import org.slf4j.LoggerFactory;
@@ -90,6 +89,7 @@
9089
* @param <T> The type of records produced by this data source
9190
*/
9291
@Internal
92+
@Deprecated
9393
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
9494
implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
9595

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaErrorCode.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,14 @@
1919

2020
import org.apache.flink.annotation.PublicEvolving;
2121

22-
/** Error codes used in {@link FlinkKafkaException}. */
22+
/**
23+
* Error codes used in {@link FlinkKafkaException}.
24+
*
25+
* @deprecated Will be removed with {@link FlinkKafkaProducer} and {@link
26+
* org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle}.
27+
*/
2328
@PublicEvolving
29+
@Deprecated
2430
public enum FlinkKafkaErrorCode {
2531
PRODUCERS_POOL_EMPTY,
2632
EXTERNAL_ERROR

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaException.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,14 @@
2020
import org.apache.flink.annotation.PublicEvolving;
2121
import org.apache.flink.util.FlinkException;
2222

23-
/** Exception used by {@link FlinkKafkaProducer} and {@link FlinkKafkaConsumer}. */
23+
/**
24+
* Exception used by {@link FlinkKafkaProducer} and {@link FlinkKafkaConsumer}.
25+
*
26+
* @deprecated Will be removed with {@link FlinkKafkaProducer} and {@link
27+
* org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle}.
28+
*/
2429
@PublicEvolving
30+
@Deprecated
2531
public class FlinkKafkaException extends FlinkException {
2632

2733
private static final long serialVersionUID = 920269130311214200L;

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaContextAware.java

+3
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
*
2727
* <p>You only need to override the methods for the information that you need. However, {@link
2828
* #getTargetTopic(Object)} is required because it is used to determine the available partitions.
29+
*
30+
* @deprecated Will be turned into internal API when {@link FlinkKafkaProducer} is removed.
2931
*/
3032
@PublicEvolving
33+
@Deprecated
3134
public interface KafkaContextAware<T> {
3235

3336
/**

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.api.common.serialization.DeserializationSchema;
2222
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2323
import org.apache.flink.util.Collector;
24-
2524
import org.apache.kafka.clients.consumer.ConsumerRecord;
2625

2726
import java.io.Serializable;
@@ -31,8 +30,10 @@
3130
* (Java/Scala objects) that are processed by Flink.
3231
*
3332
* @param <T> The type created by the keyed deserialization schema.
33+
* @deprecated Will be turned into internal API when {@link FlinkKafkaConsumer} is removed.
3434
*/
3535
@PublicEvolving
36+
@Deprecated
3637
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
3738

3839
/**

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919

2020
import org.apache.flink.annotation.PublicEvolving;
2121
import org.apache.flink.api.common.serialization.SerializationSchema;
22-
2322
import org.apache.kafka.clients.producer.ProducerRecord;
2423

2524
import javax.annotation.Nullable;
26-
2725
import java.io.Serializable;
2826

2927
/**
@@ -35,8 +33,10 @@
3533
* which the Kafka Producer is running.
3634
*
3735
* @param <T> the type of values being serialized
36+
* @deprecated Will be turned into internal API when {@link FlinkKafkaProducer} is removed.
3837
*/
3938
@PublicEvolving
39+
@Deprecated
4040
public interface KafkaSerializationSchema<T> extends Serializable {
4141

4242
/**

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* <p>The exact value of this is determined at runtime in the consumer subtasks.
2727
*/
2828
@Internal
29+
@Deprecated
2930
public enum OffsetCommitMode {
3031

3132
/** Completely disable offset committing. */

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/OffsetCommitModes.java

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
/** Utilities for {@link OffsetCommitMode}. */
2323
@Internal
24+
@Deprecated
2425
public class OffsetCommitModes {
2526

2627
/**

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.flink.util.SerializedValue;
3333

3434
import javax.annotation.Nonnull;
35-
3635
import java.io.IOException;
3736
import java.util.HashMap;
3837
import java.util.List;
@@ -61,6 +60,7 @@
6160
* @param <KPH> The type of topic/partition identifier used by Kafka in the specific version.
6261
*/
6362
@Internal
63+
@Deprecated
6464
public abstract class AbstractFetcher<T, KPH> {
6565

6666
private static final int NO_TIMESTAMPS_WATERMARKS = 0;

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
* allows the discoverer to be interrupted during a {@link #discoverPartitions()} call.
4242
*/
4343
@Internal
44+
@Deprecated
4445
public abstract class AbstractPartitionDiscoverer {
4546

4647
/** Describes whether we are discovering partitions for fixed topics or a topic pattern. */

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* @param <E> The type of elements in the queue.
5050
*/
5151
@Internal
52+
@Deprecated
5253
public class ClosableBlockingQueue<E> {
5354

5455
/** The lock used to make queue accesses and open checks atomic. */

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.annotation.Internal;
2222

2323
import javax.annotation.Nullable;
24-
2524
import java.util.concurrent.atomic.AtomicReference;
2625

2726
/**
@@ -65,6 +64,7 @@
6564
* }</pre>
6665
*/
6766
@Internal
67+
@Deprecated
6868
public class ExceptionProxy {
6969

7070
/** The thread that should be interrupted when an exception occurs. */

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.annotation.PublicEvolving;
2222
import org.apache.flink.annotation.VisibleForTesting;
2323
import org.apache.flink.util.Preconditions;
24-
2524
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
2625
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2726
import org.apache.kafka.clients.producer.Callback;
@@ -43,7 +42,6 @@
4342
import org.slf4j.LoggerFactory;
4443

4544
import javax.annotation.Nullable;
46-
4745
import java.lang.reflect.Constructor;
4846
import java.lang.reflect.Field;
4947
import java.lang.reflect.InvocationTargetException;
@@ -58,6 +56,7 @@
5856

5957
/** Internal flink kafka producer. */
6058
@PublicEvolving
59+
@Deprecated
6160
public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
6261
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class);
6362

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Handover.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.util.ExceptionUtils;
23-
2423
import org.apache.kafka.clients.consumer.ConsumerRecords;
2524

2625
import javax.annotation.Nonnull;
2726
import javax.annotation.concurrent.ThreadSafe;
28-
2927
import java.io.Closeable;
3028

3129
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -47,6 +45,7 @@
4745
*/
4846
@ThreadSafe
4947
@Internal
48+
@Deprecated
5049
public final class Handover implements Closeable {
5150

5251
private final Object lock = new Object();

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* commit request completes, which should normally be triggered from checkpoint complete event.
2626
*/
2727
@Internal
28+
@Deprecated
2829
public interface KafkaCommitCallback {
2930

3031
/**

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.api.java.tuple.Tuple2;
2424
import org.apache.flink.metrics.MetricGroup;
2525
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
26-
2726
import org.apache.kafka.clients.consumer.ConsumerRecords;
2827
import org.apache.kafka.clients.consumer.KafkaConsumer;
2928
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -35,7 +34,6 @@
3534
import org.slf4j.Logger;
3635

3736
import javax.annotation.Nonnull;
38-
3937
import java.time.Duration;
4038
import java.util.ArrayList;
4139
import java.util.Collections;
@@ -61,6 +59,7 @@
6159
* an indirection to the KafkaConsumer calls that change signature.
6260
*/
6361
@Internal
62+
@Deprecated
6463
public class KafkaConsumerThread<T> extends Thread {
6564

6665
/** Logger for this consumer. */

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.api.common.typeinfo.TypeInformation;
2323
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
2424
import org.apache.flink.util.Collector;
25-
2625
import org.apache.kafka.clients.consumer.ConsumerRecord;
2726

2827
/**
@@ -32,6 +31,7 @@
3231
* @param <T> The type created by the deserialization schema.
3332
*/
3433
@Internal
34+
@Deprecated
3535
public class KafkaDeserializationSchemaWrapper<T> implements KafkaDeserializationSchema<T> {
3636

3737
private static final long serialVersionUID = 2651665280744549932L;

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.flink.util.Collector;
2727
import org.apache.flink.util.ExceptionUtils;
2828
import org.apache.flink.util.SerializedValue;
29-
3029
import org.apache.kafka.clients.consumer.ConsumerRecord;
3130
import org.apache.kafka.clients.consumer.ConsumerRecords;
3231
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -35,7 +34,6 @@
3534
import org.slf4j.LoggerFactory;
3635

3736
import javax.annotation.Nonnull;
38-
3937
import java.util.ArrayDeque;
4038
import java.util.HashMap;
4139
import java.util.List;
@@ -51,6 +49,7 @@
5149
* @param <T> The type of elements produced by the fetcher.
5250
*/
5351
@Internal
52+
@Deprecated
5453
public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
5554

5655
private static final Logger LOG = LoggerFactory.getLogger(KafkaFetcher.class);

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionDiscoverer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.streaming.connectors.kafka.internals;
1919

2020
import org.apache.flink.annotation.Internal;
21-
2221
import org.apache.kafka.clients.consumer.KafkaConsumer;
2322
import org.apache.kafka.common.PartitionInfo;
2423

@@ -34,6 +33,7 @@
3433
* brokers via the Kafka high-level consumer API.
3534
*/
3635
@Internal
36+
@Deprecated
3737
public class KafkaPartitionDiscoverer extends AbstractPartitionDiscoverer {
3838

3939
private final Properties kafkaProperties;

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
2424
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
2525
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
26-
2726
import org.apache.kafka.clients.producer.ProducerRecord;
2827

2928
import javax.annotation.Nullable;
@@ -35,6 +34,7 @@
3534
* KafkaSerializationSchema}.
3635
*/
3736
@Internal
37+
@Deprecated
3838
public class KafkaSerializationSchemaWrapper<T>
3939
implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
4040

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
3333
import org.apache.flink.util.Preconditions;
3434
import org.apache.flink.util.SerializedValue;
35-
3635
import org.apache.kafka.clients.consumer.ConsumerRecord;
3736
import org.apache.kafka.common.TopicPartition;
3837

@@ -50,6 +49,7 @@
5049

5150
/** Fetch data from Kafka for Kafka Shuffle. */
5251
@Internal
52+
@Deprecated
5353
public class KafkaShuffleFetcher<T> extends KafkaFetcher<T> {
5454
/** The handler to check and generate watermarks from fetched records. * */
5555
private final WatermarkHandler watermarkHandler;

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java

+4
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,12 @@
3232
*
3333
* <p>Note: This class must not change in its structure, because it would change the serialization
3434
* format and make previous savepoints unreadable.
35+
*
36+
* @deprecated Will be turned into internal class when {@link
37+
* org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed.
3538
*/
3639
@PublicEvolving
40+
@Deprecated
3741
public final class KafkaTopicPartition implements Serializable {
3842

3943
/**

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
/** Utility for assigning Kafka partitions to consumer subtasks. */
2323
@Internal
24+
@Deprecated
2425
public class KafkaTopicPartitionAssigner {
2526

2627
/**

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.streaming.connectors.kafka.internals;
1919

2020
import org.apache.flink.annotation.Internal;
21-
2221
import org.apache.kafka.common.Node;
2322

2423
import java.io.Serializable;
@@ -27,6 +26,7 @@
2726
* Serializable Topic Partition info with leader Node information. This class is used at runtime.
2827
*/
2928
@Internal
29+
@Deprecated
3030
public class KafkaTopicPartitionLeader implements Serializable {
3131

3232
private static final long serialVersionUID = 9145855900303748582L;

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
3030
*/
3131
@Internal
32+
@Deprecated
3233
public class KafkaTopicPartitionState<T, KPH> {
3334

3435
// ------------------------------------------------------------------------

Diff for: flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* number that is probably (hopefully) not used by Kafka as a magic number for anything else.
2727
*/
2828
@Internal
29+
@Deprecated
2930
public class KafkaTopicPartitionStateSentinel {
3031

3132
/** Magic number that defines an unset offset. */

0 commit comments

Comments
 (0)