Skip to content

Commit ced8771

Browse files
committed
[FLINK-36177] Introduce KafkaPartitioner to replace FlinkKafkaPartitioner
Relocate FlinkKafkaPartitioner to KafkaSink package and turn it into a functional interface.
1 parent 12faf7d commit ced8771

File tree

7 files changed

+87
-51
lines changed

7 files changed

+87
-51
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.kafka.sink;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
22+
import java.io.Serializable;
23+
24+
/**
25+
* A {@code KafkaPartitioner} wraps logic on how to partition records across partitions of multiple
26+
* Kafka topics.
27+
*/
28+
@PublicEvolving
29+
public interface KafkaPartitioner<T> extends Serializable {
30+
/**
31+
* Initializer for the partitioner. This is called once on each parallel sink instance of the
32+
* Flink Kafka producer. This method should be overridden if necessary.
33+
*
34+
* @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
35+
* @param parallelInstances the total number of parallel instances
36+
*/
37+
default void open(int parallelInstanceId, int parallelInstances) {}
38+
39+
/**
40+
* Determine the id of the partition that the record should be written to.
41+
*
42+
* @param record the record value
43+
* @param key serialized key of the record
44+
* @param value serialized value of the record
45+
* @param targetTopic target topic for the record
46+
* @param partitions found partitions for the target topic
47+
* @return the id of the target partition
48+
*/
49+
int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
50+
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
8282

8383
@Nullable private Function<? super IN, String> topicSelector;
8484
@Nullable private SerializationSchema<? super IN> valueSerializationSchema;
85-
@Nullable private FlinkKafkaPartitioner<? super IN> partitioner;
85+
@Nullable private KafkaPartitioner<? super IN> partitioner;
8686
@Nullable private SerializationSchema<? super IN> keySerializationSchema;
8787
@Nullable private HeaderProvider<? super IN> headerProvider;
8888

@@ -91,6 +91,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
9191
*
9292
* @param partitioner
9393
* @return {@code this}
94+
* @deprecated use {@link #setPartitioner(KafkaPartitioner)}
9495
*/
9596
public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(
9697
FlinkKafkaPartitioner<? super T> partitioner) {
@@ -99,6 +100,19 @@ public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(
99100
return self;
100101
}
101102

103+
/**
104+
* Sets a custom partitioner determining the target partition of the target topic.
105+
*
106+
* @param partitioner
107+
* @return {@code this}
108+
*/
109+
public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(
110+
KafkaPartitioner<? super T> partitioner) {
111+
KafkaRecordSerializationSchemaBuilder<T> self = self();
112+
self.partitioner = checkNotNull(partitioner);
113+
return self;
114+
}
115+
102116
/**
103117
* Sets a fixed topic which used as destination for all records.
104118
*
@@ -295,15 +309,15 @@ private static class KafkaRecordSerializationSchemaWrapper<IN>
295309
implements KafkaRecordSerializationSchema<IN> {
296310
private final SerializationSchema<? super IN> valueSerializationSchema;
297311
private final Function<? super IN, String> topicSelector;
298-
private final FlinkKafkaPartitioner<? super IN> partitioner;
312+
private final KafkaPartitioner<? super IN> partitioner;
299313
private final SerializationSchema<? super IN> keySerializationSchema;
300314
private final HeaderProvider<? super IN> headerProvider;
301315

302316
KafkaRecordSerializationSchemaWrapper(
303317
Function<? super IN, String> topicSelector,
304318
SerializationSchema<? super IN> valueSerializationSchema,
305319
@Nullable SerializationSchema<? super IN> keySerializationSchema,
306-
@Nullable FlinkKafkaPartitioner<? super IN> partitioner,
320+
@Nullable KafkaPartitioner<? super IN> partitioner,
307321
@Nullable HeaderProvider<? super IN> headerProvider) {
308322
this.topicSelector = checkNotNull(topicSelector);
309323
this.valueSerializationSchema = checkNotNull(valueSerializationSchema);

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java

+4-32
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,15 @@
1818
package org.apache.flink.streaming.connectors.kafka.partitioner;
1919

2020
import org.apache.flink.annotation.PublicEvolving;
21-
22-
import java.io.Serializable;
21+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2322

2423
/**
2524
* A {@link FlinkKafkaPartitioner} wraps logic on how to partition records across partitions of
2625
* multiple Kafka topics.
2726
*
28-
* @deprecated Will be turned into internal class when {@link
29-
* org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed.
27+
* @deprecated Use {@link KafkaPartitioner} instead for {@link
28+
* org.apache.flink.connector.kafka.sink.KafkaSink}.
3029
*/
3130
@PublicEvolving
3231
@Deprecated
33-
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
34-
35-
private static final long serialVersionUID = -9086719227828020494L;
36-
37-
/**
38-
* Initializer for the partitioner. This is called once on each parallel sink instance of the
39-
* Flink Kafka producer. This method should be overridden if necessary.
40-
*
41-
* @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
42-
* @param parallelInstances the total number of parallel instances
43-
*/
44-
public void open(int parallelInstanceId, int parallelInstances) {
45-
// overwrite this method if needed.
46-
}
47-
48-
/**
49-
* Determine the id of the partition that the record should be written to.
50-
*
51-
* @param record the record value
52-
* @param key serialized key of the record
53-
* @param value serialized value of the record
54-
* @param targetTopic target topic for the record
55-
* @param partitions found partitions for the target topic
56-
* @return the id of the target partition
57-
*/
58-
public abstract int partition(
59-
T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
60-
}
32+
public abstract class FlinkKafkaPartitioner<T> implements KafkaPartitioner<T> {}

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.api.common.serialization.SerializationSchema;
22+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2223
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
2324
import org.apache.flink.connector.kafka.sink.KafkaSink;
24-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2525
import org.apache.flink.table.data.GenericRowData;
2626
import org.apache.flink.table.data.RowData;
2727
import org.apache.flink.types.RowKind;
@@ -46,7 +46,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
4646

4747
private final Set<String> topics;
4848
private final Pattern topicPattern;
49-
private final FlinkKafkaPartitioner<RowData> partitioner;
49+
private final KafkaPartitioner<RowData> partitioner;
5050
@Nullable private final SerializationSchema<RowData> keySerialization;
5151
private final SerializationSchema<RowData> valueSerialization;
5252
private final RowData.FieldGetter[] keyFieldGetters;
@@ -59,7 +59,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
5959
DynamicKafkaRecordSerializationSchema(
6060
@Nullable List<String> topics,
6161
@Nullable Pattern topicPattern,
62-
@Nullable FlinkKafkaPartitioner<RowData> partitioner,
62+
@Nullable KafkaPartitioner<RowData> partitioner,
6363
@Nullable SerializationSchema<RowData> keySerialization,
6464
SerializationSchema<RowData> valueSerialization,
6565
RowData.FieldGetter[] keyFieldGetters,

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.configuration.ReadableConfig;
2626
import org.apache.flink.connector.base.DeliveryGuarantee;
27+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2728
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
2829
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
2930
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3031
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
31-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3232
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
3333
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
3434
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy;
@@ -386,7 +386,7 @@ public static Properties getKafkaProperties(Map<String, String> tableOptions) {
386386
* The partitioner can be either "fixed", "round-robin" or a customized partitioner full class
387387
* name.
388388
*/
389-
public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
389+
public static Optional<KafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
390390
ReadableConfig tableOptions, ClassLoader classLoader) {
391391
return tableOptions
392392
.getOptional(SINK_PARTITIONER)
@@ -465,19 +465,19 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
465465
}
466466

467467
/** Returns a class value with the given class name. */
468-
private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
468+
private static <T> KafkaPartitioner<T> initializePartitioner(
469469
String name, ClassLoader classLoader) {
470470
try {
471471
Class<?> clazz = Class.forName(name, true, classLoader);
472-
if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
472+
if (!KafkaPartitioner.class.isAssignableFrom(clazz)) {
473473
throw new ValidationException(
474474
String.format(
475-
"Sink partitioner class '%s' should extend from the required class %s",
476-
name, FlinkKafkaPartitioner.class.getName()));
475+
"Sink partitioner class '%s' should implement the required class %s",
476+
name, KafkaPartitioner.class.getName()));
477477
}
478478
@SuppressWarnings("unchecked")
479-
final FlinkKafkaPartitioner<T> kafkaPartitioner =
480-
InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader);
479+
final KafkaPartitioner<T> kafkaPartitioner =
480+
InstantiationUtil.instantiate(name, KafkaPartitioner.class, classLoader);
481481

482482
return kafkaPartitioner;
483483
} catch (ClassNotFoundException | FlinkException e) {

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.apache.flink.api.common.typeinfo.TypeInformation;
2525
import org.apache.flink.api.common.typeutils.TypeSerializer;
2626
import org.apache.flink.connector.base.DeliveryGuarantee;
27+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2728
import org.apache.flink.connector.kafka.sink.KafkaSink;
2829
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
2930
import org.apache.flink.streaming.api.datastream.DataStream;
3031
import org.apache.flink.streaming.api.datastream.DataStreamSink;
31-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3232
import org.apache.flink.table.api.DataTypes;
3333
import org.apache.flink.table.connector.ChangelogMode;
3434
import org.apache.flink.table.connector.Projection;
@@ -125,7 +125,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
125125
protected final Properties properties;
126126

127127
/** Partitioner to select Kafka partition for each item. */
128-
protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
128+
protected final @Nullable KafkaPartitioner<RowData> partitioner;
129129

130130
/**
131131
* Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message
@@ -150,7 +150,7 @@ public KafkaDynamicSink(
150150
@Nullable List<String> topics,
151151
@Nullable Pattern topicPattern,
152152
Properties properties,
153-
@Nullable FlinkKafkaPartitioner<RowData> partitioner,
153+
@Nullable KafkaPartitioner<RowData> partitioner,
154154
DeliveryGuarantee deliveryGuarantee,
155155
boolean upsertMode,
156156
SinkBufferFlushMode flushMode,

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import org.apache.flink.configuration.Configuration;
2727
import org.apache.flink.configuration.ReadableConfig;
2828
import org.apache.flink.connector.base.DeliveryGuarantee;
29+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2930
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
3031
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
3132
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
3233
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
33-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3434
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions;
3535
import org.apache.flink.table.api.ValidationException;
3636
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -427,7 +427,7 @@ protected KafkaDynamicSink createKafkaTableSink(
427427
@Nullable List<String> topics,
428428
@Nullable Pattern topicPattern,
429429
Properties properties,
430-
FlinkKafkaPartitioner<RowData> partitioner,
430+
KafkaPartitioner<RowData> partitioner,
431431
DeliveryGuarantee deliveryGuarantee,
432432
Integer parallelism,
433433
@Nullable String transactionalIdPrefix) {

0 commit comments

Comments
 (0)