Skip to content

Commit 1b914b3

Browse files
committed
[FLINK-36177] Introduce KafkaPartitioner to replace FlinkKafkaPartitioner
Relocate FlinkKafkaPartitioner to KafkaSink package and turn it into a functional interface.
1 parent 5b68f06 commit 1b914b3

File tree

7 files changed

+87
-57
lines changed

7 files changed

+87
-57
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.kafka.sink;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
22+
import java.io.Serializable;
23+
24+
/**
25+
* A {@code KafkaPartitioner} wraps logic on how to partition records across partitions of multiple
26+
* Kafka topics.
27+
*/
28+
@PublicEvolving
29+
public interface KafkaPartitioner<T> extends Serializable {
30+
/**
31+
* Initializer for the partitioner. This is called once on each parallel sink instance of the
32+
* Flink Kafka producer. This method should be overridden if necessary.
33+
*
34+
* @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
35+
* @param parallelInstances the total number of parallel instances
36+
*/
37+
default void open(int parallelInstanceId, int parallelInstances) {}
38+
39+
/**
40+
* Determine the id of the partition that the record should be written to.
41+
*
42+
* @param record the record value
43+
* @param key serialized key of the record
44+
* @param value serialized value of the record
45+
* @param targetTopic target topic for the record
46+
* @param partitions found partitions for the target topic
47+
* @return the id of the target partition
48+
*/
49+
int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
50+
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@
2020
import org.apache.flink.annotation.PublicEvolving;
2121
import org.apache.flink.api.common.serialization.SerializationSchema;
2222
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
23-
2423
import org.apache.kafka.clients.producer.ProducerRecord;
2524
import org.apache.kafka.common.serialization.Serializer;
2625

2726
import javax.annotation.Nullable;
28-
2927
import java.io.Serializable;
3028
import java.util.HashMap;
3129
import java.util.Map;
@@ -82,7 +80,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
8280

8381
@Nullable private Function<? super IN, String> topicSelector;
8482
@Nullable private SerializationSchema<? super IN> valueSerializationSchema;
85-
@Nullable private FlinkKafkaPartitioner<? super IN> partitioner;
83+
@Nullable private KafkaPartitioner<? super IN> partitioner;
8684
@Nullable private SerializationSchema<? super IN> keySerializationSchema;
8785
@Nullable private HeaderProvider<? super IN> headerProvider;
8886

@@ -91,6 +89,7 @@ public class KafkaRecordSerializationSchemaBuilder<IN> {
9189
*
9290
* @param partitioner
9391
* @return {@code this}
92+
* @deprecated use {@link #setPartitioner(KafkaPartitioner)}
9493
*/
9594
public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(
9695
FlinkKafkaPartitioner<? super T> partitioner) {
@@ -99,6 +98,19 @@ public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(
9998
return self;
10099
}
101100

101+
/**
102+
* Sets a custom partitioner determining the target partition of the target topic.
103+
*
104+
* @param partitioner
105+
* @return {@code this}
106+
*/
107+
public <T extends IN> KafkaRecordSerializationSchemaBuilder<T> setPartitioner(
108+
KafkaPartitioner<? super T> partitioner) {
109+
KafkaRecordSerializationSchemaBuilder<T> self = self();
110+
self.partitioner = checkNotNull(partitioner);
111+
return self;
112+
}
113+
102114
/**
103115
* Sets a fixed topic which used as destination for all records.
104116
*
@@ -295,15 +307,15 @@ private static class KafkaRecordSerializationSchemaWrapper<IN>
295307
implements KafkaRecordSerializationSchema<IN> {
296308
private final SerializationSchema<? super IN> valueSerializationSchema;
297309
private final Function<? super IN, String> topicSelector;
298-
private final FlinkKafkaPartitioner<? super IN> partitioner;
310+
private final KafkaPartitioner<? super IN> partitioner;
299311
private final SerializationSchema<? super IN> keySerializationSchema;
300312
private final HeaderProvider<? super IN> headerProvider;
301313

302314
KafkaRecordSerializationSchemaWrapper(
303315
Function<? super IN, String> topicSelector,
304316
SerializationSchema<? super IN> valueSerializationSchema,
305317
@Nullable SerializationSchema<? super IN> keySerializationSchema,
306-
@Nullable FlinkKafkaPartitioner<? super IN> partitioner,
318+
@Nullable KafkaPartitioner<? super IN> partitioner,
307319
@Nullable HeaderProvider<? super IN> headerProvider) {
308320
this.topicSelector = checkNotNull(topicSelector);
309321
this.valueSerializationSchema = checkNotNull(valueSerializationSchema);

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java

+4-32
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,15 @@
1818
package org.apache.flink.streaming.connectors.kafka.partitioner;
1919

2020
import org.apache.flink.annotation.PublicEvolving;
21-
22-
import java.io.Serializable;
21+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2322

2423
/**
2524
* A {@link FlinkKafkaPartitioner} wraps logic on how to partition records across partitions of
2625
* multiple Kafka topics.
2726
*
28-
* @deprecated Will be turned into internal class when {@link
29-
* org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} is removed.
27+
* @deprecated Use {@link KafkaPartitioner} instead for {@link
28+
* org.apache.flink.connector.kafka.sink.KafkaSink}.
3029
*/
3130
@PublicEvolving
3231
@Deprecated
33-
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
34-
35-
private static final long serialVersionUID = -9086719227828020494L;
36-
37-
/**
38-
* Initializer for the partitioner. This is called once on each parallel sink instance of the
39-
* Flink Kafka producer. This method should be overridden if necessary.
40-
*
41-
* @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
42-
* @param parallelInstances the total number of parallel instances
43-
*/
44-
public void open(int parallelInstanceId, int parallelInstances) {
45-
// overwrite this method if needed.
46-
}
47-
48-
/**
49-
* Determine the id of the partition that the record should be written to.
50-
*
51-
* @param record the record value
52-
* @param key serialized key of the record
53-
* @param value serialized value of the record
54-
* @param targetTopic target topic for the record
55-
* @param partitions found partitions for the target topic
56-
* @return the id of the target partition
57-
*/
58-
public abstract int partition(
59-
T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
60-
}
32+
public abstract class FlinkKafkaPartitioner<T> implements KafkaPartitioner<T> {}

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.api.common.serialization.SerializationSchema;
22+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2223
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
2324
import org.apache.flink.connector.kafka.sink.KafkaSink;
24-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2525
import org.apache.flink.table.data.GenericRowData;
2626
import org.apache.flink.table.data.RowData;
2727
import org.apache.flink.types.RowKind;
@@ -44,7 +44,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
4444

4545
private final Set<String> topics;
4646
private final Pattern topicPattern;
47-
private final FlinkKafkaPartitioner<RowData> partitioner;
47+
private final KafkaPartitioner<RowData> partitioner;
4848
@Nullable private final SerializationSchema<RowData> keySerialization;
4949
private final SerializationSchema<RowData> valueSerialization;
5050
private final RowData.FieldGetter[] keyFieldGetters;
@@ -57,7 +57,7 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
5757
DynamicKafkaRecordSerializationSchema(
5858
@Nullable List<String> topics,
5959
@Nullable Pattern topicPattern,
60-
@Nullable FlinkKafkaPartitioner<RowData> partitioner,
60+
@Nullable KafkaPartitioner<RowData> partitioner,
6161
@Nullable SerializationSchema<RowData> keySerialization,
6262
SerializationSchema<RowData> valueSerialization,
6363
RowData.FieldGetter[] keyFieldGetters,

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.configuration.ReadableConfig;
2626
import org.apache.flink.connector.base.DeliveryGuarantee;
27+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2728
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
2829
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
2930
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
3031
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
31-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3232
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
3333
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
3434
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy;
@@ -386,7 +386,7 @@ public static Properties getKafkaProperties(Map<String, String> tableOptions) {
386386
* The partitioner can be either "fixed", "round-robin" or a customized partitioner full class
387387
* name.
388388
*/
389-
public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
389+
public static Optional<KafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
390390
ReadableConfig tableOptions, ClassLoader classLoader) {
391391
return tableOptions
392392
.getOptional(SINK_PARTITIONER)
@@ -465,19 +465,19 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
465465
}
466466

467467
/** Returns a class value with the given class name. */
468-
private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
468+
private static <T> KafkaPartitioner<T> initializePartitioner(
469469
String name, ClassLoader classLoader) {
470470
try {
471471
Class<?> clazz = Class.forName(name, true, classLoader);
472-
if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
472+
if (!KafkaPartitioner.class.isAssignableFrom(clazz)) {
473473
throw new ValidationException(
474474
String.format(
475-
"Sink partitioner class '%s' should extend from the required class %s",
476-
name, FlinkKafkaPartitioner.class.getName()));
475+
"Sink partitioner class '%s' should implement the required class %s",
476+
name, KafkaPartitioner.class.getName()));
477477
}
478478
@SuppressWarnings("unchecked")
479-
final FlinkKafkaPartitioner<T> kafkaPartitioner =
480-
InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader);
479+
final KafkaPartitioner<T> kafkaPartitioner =
480+
InstantiationUtil.instantiate(name, KafkaPartitioner.class, classLoader);
481481

482482
return kafkaPartitioner;
483483
} catch (ClassNotFoundException | FlinkException e) {

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.apache.flink.api.common.typeinfo.TypeInformation;
2525
import org.apache.flink.api.common.typeutils.TypeSerializer;
2626
import org.apache.flink.connector.base.DeliveryGuarantee;
27+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2728
import org.apache.flink.connector.kafka.sink.KafkaSink;
2829
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
2930
import org.apache.flink.streaming.api.datastream.DataStream;
3031
import org.apache.flink.streaming.api.datastream.DataStreamSink;
31-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3232
import org.apache.flink.table.api.DataTypes;
3333
import org.apache.flink.table.connector.ChangelogMode;
3434
import org.apache.flink.table.connector.Projection;
@@ -44,12 +44,10 @@
4444
import org.apache.flink.table.types.DataType;
4545
import org.apache.flink.table.types.logical.LogicalType;
4646
import org.apache.flink.table.types.utils.DataTypeUtils;
47-
4847
import org.apache.kafka.clients.producer.ProducerConfig;
4948
import org.apache.kafka.common.header.Header;
5049

5150
import javax.annotation.Nullable;
52-
5351
import java.io.Serializable;
5452
import java.util.ArrayList;
5553
import java.util.Arrays;
@@ -125,7 +123,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
125123
protected final Properties properties;
126124

127125
/** Partitioner to select Kafka partition for each item. */
128-
protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
126+
protected final @Nullable KafkaPartitioner<RowData> partitioner;
129127

130128
/**
131129
* Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message
@@ -150,7 +148,7 @@ public KafkaDynamicSink(
150148
@Nullable List<String> topics,
151149
@Nullable Pattern topicPattern,
152150
Properties properties,
153-
@Nullable FlinkKafkaPartitioner<RowData> partitioner,
151+
@Nullable KafkaPartitioner<RowData> partitioner,
154152
DeliveryGuarantee deliveryGuarantee,
155153
boolean upsertMode,
156154
SinkBufferFlushMode flushMode,

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import org.apache.flink.configuration.Configuration;
2727
import org.apache.flink.configuration.ReadableConfig;
2828
import org.apache.flink.connector.base.DeliveryGuarantee;
29+
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
2930
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
3031
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
3132
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
3233
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
33-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
3434
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions;
3535
import org.apache.flink.table.api.ValidationException;
3636
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -48,12 +48,10 @@
4848
import org.apache.flink.table.factories.SerializationFormatFactory;
4949
import org.apache.flink.table.types.DataType;
5050
import org.apache.flink.types.RowKind;
51-
5251
import org.slf4j.Logger;
5352
import org.slf4j.LoggerFactory;
5453

5554
import javax.annotation.Nullable;
56-
5755
import java.time.Duration;
5856
import java.util.HashSet;
5957
import java.util.List;
@@ -427,7 +425,7 @@ protected KafkaDynamicSink createKafkaTableSink(
427425
@Nullable List<String> topics,
428426
@Nullable Pattern topicPattern,
429427
Properties properties,
430-
FlinkKafkaPartitioner<RowData> partitioner,
428+
KafkaPartitioner<RowData> partitioner,
431429
DeliveryGuarantee deliveryGuarantee,
432430
Integer parallelism,
433431
@Nullable String transactionalIdPrefix) {

0 commit comments

Comments
 (0)