Skip to content

Commit a16abd5

Browse files
authored
[FLINK-36913][pipeline-connector][kafka] Introduce option to define custom mapping from upstream table id to downstream topic name
This closes #3805
1 parent 77785c1 commit a16abd5

File tree

9 files changed

+198
-9
lines changed

9 files changed

+198
-9
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/kafka.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,13 @@ Pipeline 连接器配置项
143143
<td>String</td>
144144
<td>Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。 </td>
145145
</tr>
146+
<tr>
147+
<td>sink.tableId-to-topic.mapping</td>
148+
<td>optional</td>
149+
<td style="word-wrap: break-word;">(none)</td>
150+
<td>String</td>
151+
<td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 `mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
152+
</tr>
146153
</tbody>
147154
</table>
148155
</div>

docs/content/docs/connectors/pipeline-connectors/kafka.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ Pipeline Connector Options
141141
<td>String</td>
142142
<td>custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'. </td>
143143
</tr>
144+
<tr>
145+
<td>sink.tableId-to-topic.mapping</td>
146+
<td>optional</td>
147+
<td style="word-wrap: break-word;">(none)</td>
148+
<td>String</td>
149+
<td>Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.tableId-to-topic.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
150+
</tr>
144151
</tbody>
145152
</table>
146153
</div>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class KafkaDataSink implements DataSink {
5353

5454
final String customHeaders;
5555

56+
final String tableMapping;
57+
5658
public KafkaDataSink(
5759
DeliveryGuarantee deliveryGuarantee,
5860
Properties kafkaProperties,
@@ -62,7 +64,8 @@ public KafkaDataSink(
6264
SerializationSchema<Event> valueSerialization,
6365
String topic,
6466
boolean addTableToHeaderEnabled,
65-
String customHeaders) {
67+
String customHeaders,
68+
String tableMapping) {
6669
this.deliveryGuarantee = deliveryGuarantee;
6770
this.kafkaProperties = kafkaProperties;
6871
this.partitionStrategy = partitionStrategy;
@@ -72,6 +75,7 @@ public KafkaDataSink(
7275
this.topic = topic;
7376
this.addTableToHeaderEnabled = addTableToHeaderEnabled;
7477
this.customHeaders = customHeaders;
78+
this.tableMapping = tableMapping;
7579
}
7680

7781
@Override
@@ -92,7 +96,8 @@ public EventSinkProvider getEventSinkProvider() {
9296
valueSerialization,
9397
topic,
9498
addTableToHeaderEnabled,
95-
customHeaders))
99+
customHeaders,
100+
tableMapping))
96101
.build());
97102
}
98103

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
4242
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
4343
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
44+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING;
4445
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC;
4546
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT;
4647

@@ -92,6 +93,7 @@ public DataSink createDataSink(Context context) {
9293
context.getFactoryConfiguration().get(KafkaDataSinkOptions.SINK_CUSTOM_HEADER);
9394
PartitionStrategy partitionStrategy =
9495
context.getFactoryConfiguration().get(KafkaDataSinkOptions.PARTITION_STRATEGY);
96+
String tableMapping = context.getFactoryConfiguration().get(SINK_TABLE_ID_TO_TOPIC_MAPPING);
9597
return new KafkaDataSink(
9698
deliveryGuarantee,
9799
kafkaProperties,
@@ -101,7 +103,8 @@ public DataSink createDataSink(Context context) {
101103
valueSerialization,
102104
topic,
103105
addTableToHeaderEnabled,
104-
customHeaders);
106+
customHeaders,
107+
tableMapping);
105108
}
106109

107110
@Override
@@ -124,6 +127,7 @@ public Set<ConfigOption<?>> optionalOptions() {
124127
options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED);
125128
options.add(SINK_CUSTOM_HEADER);
126129
options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE);
130+
options.add(SINK_TABLE_ID_TO_TOPIC_MAPPING);
127131
return options;
128132
}
129133
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.kafka.sink;
1919

2020
import org.apache.flink.cdc.common.configuration.ConfigOption;
21+
import org.apache.flink.cdc.common.configuration.description.Description;
2122
import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType;
2223
import org.apache.flink.connector.base.DeliveryGuarantee;
2324

@@ -29,6 +30,10 @@ public class KafkaDataSinkOptions {
2930
// Prefix for Kafka specific properties.
3031
public static final String PROPERTIES_PREFIX = "properties.";
3132

33+
public static final String DELIMITER_TABLE_MAPPINGS = ";";
34+
35+
public static final String DELIMITER_SELECTOR_TOPIC = ":";
36+
3237
public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =
3338
key("sink.delivery-guarantee")
3439
.enumType(DeliveryGuarantee.class)
@@ -79,4 +84,20 @@ public class KafkaDataSinkOptions {
7984
.defaultValue("")
8085
.withDescription(
8186
"custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'.");
87+
88+
public static final ConfigOption<String> SINK_TABLE_ID_TO_TOPIC_MAPPING =
89+
key("sink.tableId-to-topic.mapping")
90+
.stringType()
91+
.noDefaultValue()
92+
.withDescription(
93+
Description.builder()
94+
.text(
95+
"Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by ")
96+
.text(DELIMITER_TABLE_MAPPINGS)
97+
.text(
98+
", separate upstream tableId selectors and downstream Kafka topic by ")
99+
.text(DELIMITER_SELECTOR_TOPIC)
100+
.text(
101+
". For example, we can set 'sink.tableId-to-topic.mappingg' like 'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
102+
.build());
82103
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.flink.cdc.common.event.Event;
2323
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2424
import org.apache.flink.cdc.common.event.TableId;
25+
import org.apache.flink.cdc.common.schema.Selectors;
26+
import org.apache.flink.cdc.connectors.kafka.utils.KafkaSinkUtils;
2527
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
2628

2729
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -57,6 +59,13 @@ public class PipelineKafkaRecordSerializationSchema
5759
// key value pairs to be put into Kafka Record Header.
5860
public final Map<String, String> customHeaders;
5961

62+
private final String mappingRuleString;
63+
64+
private Map<Selectors, String> selectorsToTopicMap;
65+
66+
// A cache to speed up TableId to Topic mapping.
67+
private Map<TableId, String> tableIdToTopicCache;
68+
6069
public static final String NAMESPACE_HEADER_KEY = "namespace";
6170

6271
public static final String SCHEMA_NAME_HEADER_KEY = "schemaName";
@@ -69,7 +78,8 @@ public class PipelineKafkaRecordSerializationSchema
6978
SerializationSchema<Event> valueSerialization,
7079
String unifiedTopic,
7180
boolean addTableToHeaderEnabled,
72-
String customHeaderString) {
81+
String customHeaderString,
82+
String mappingRuleString) {
7383
this.keySerialization = keySerialization;
7484
this.valueSerialization = checkNotNull(valueSerialization);
7585
this.unifiedTopic = unifiedTopic;
@@ -90,6 +100,7 @@ public class PipelineKafkaRecordSerializationSchema
90100
}
91101
}
92102
partition = partitionStrategy.equals(PartitionStrategy.ALL_TO_ZERO) ? 0 : null;
103+
this.mappingRuleString = mappingRuleString;
93104
}
94105

95106
@Override
@@ -102,7 +113,7 @@ public ProducerRecord<byte[], byte[]> serialize(
102113
// skip sending SchemaChangeEvent.
103114
return null;
104115
}
105-
String topic = unifiedTopic == null ? changeEvent.tableId().toString() : unifiedTopic;
116+
String topic = inferTopicName(changeEvent.tableId());
106117
RecordHeaders recordHeaders = new RecordHeaders();
107118
if (addTableToHeaderEnabled) {
108119
String namespace =
@@ -128,10 +139,30 @@ public ProducerRecord<byte[], byte[]> serialize(
128139
topic, partition, null, keySerialized, valueSerialized, recordHeaders);
129140
}
130141

142+
private String inferTopicName(TableId tableId) {
143+
return tableIdToTopicCache.computeIfAbsent(
144+
tableId,
145+
(table -> {
146+
if (unifiedTopic != null && !unifiedTopic.isEmpty()) {
147+
return unifiedTopic;
148+
}
149+
if (selectorsToTopicMap != null && !selectorsToTopicMap.isEmpty()) {
150+
for (Map.Entry<Selectors, String> entry : selectorsToTopicMap.entrySet()) {
151+
if (entry.getKey().isMatch(tableId)) {
152+
return entry.getValue();
153+
}
154+
}
155+
}
156+
return table.toString();
157+
}));
158+
}
159+
131160
@Override
132161
public void open(
133162
SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
134163
throws Exception {
164+
this.selectorsToTopicMap = KafkaSinkUtils.parseSelectorsToTopicMap(mappingRuleString);
165+
this.tableIdToTopicCache = new HashMap<>();
135166
valueSerialization.open(context);
136167
}
137168
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.kafka.utils;
19+
20+
import org.apache.flink.cdc.common.schema.Selectors;
21+
import org.apache.flink.cdc.common.utils.Preconditions;
22+
import org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions;
23+
24+
import java.util.LinkedHashMap;
25+
import java.util.Map;
26+
27+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_SELECTOR_TOPIC;
28+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_TABLE_MAPPINGS;
29+
30+
/** Util class for {@link org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSink}. */
31+
public class KafkaSinkUtils {
32+
33+
/** Parse the mapping text to a map from Selectors to Kafka Topic name. */
34+
public static Map<Selectors, String> parseSelectorsToTopicMap(String mappingRuleString) {
35+
// Keep the order.
36+
Map<Selectors, String> result = new LinkedHashMap<>();
37+
if (mappingRuleString == null || mappingRuleString.isEmpty()) {
38+
return result;
39+
}
40+
for (String mapping : mappingRuleString.split(DELIMITER_TABLE_MAPPINGS)) {
41+
String[] selectorsAndTopic = mapping.split(DELIMITER_SELECTOR_TOPIC);
42+
Preconditions.checkArgument(
43+
selectorsAndTopic.length == 2,
44+
"Please check your configuration of "
45+
+ KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING);
46+
Selectors selectors =
47+
new Selectors.SelectorsBuilder().includeTables(selectorsAndTopic[0]).build();
48+
result.put(selectors, selectorsAndTopic[1]);
49+
}
50+
return result;
51+
}
52+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ void testCreateDataSink() {
3939
Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class);
4040

4141
Configuration conf = Configuration.fromMap(ImmutableMap.<String, String>builder().build());
42+
conf.set(
43+
KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING,
44+
"mydb.mytable1:topic1;mydb.mytable2:topic2");
4245
DataSink dataSink =
4346
sinkFactory.createDataSink(
4447
new FactoryHelper.DefaultContext(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,7 @@ void testDebeziumJsonFormat() throws Exception {
263263

264264
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
265265
drainAllRecordsFromTopic(topic, false, 0);
266-
final long recordsCount = 5;
267-
assertThat(recordsCount).isEqualTo(collectedRecords.size());
266+
assertThat(collectedRecords).hasSize(5);
268267
ObjectMapper mapper =
269268
JacksonMapperFactory.createObjectMapper()
270269
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
@@ -325,8 +324,7 @@ void testCanalJsonFormat() throws Exception {
325324

326325
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
327326
drainAllRecordsFromTopic(topic, false, 0);
328-
final long recordsCount = 5;
329-
assertThat(recordsCount).isEqualTo(collectedRecords.size());
327+
assertThat(collectedRecords).hasSize(5);
330328
for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) {
331329
assertThat(
332330
consumerRecord
@@ -560,6 +558,67 @@ void testTopicAndHeaderOption() throws Exception {
560558
checkProducerLeak();
561559
}
562560

561+
@Test
562+
void testSinkTableMapping() throws Exception {
563+
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
564+
env.enableCheckpointing(1000L);
565+
env.setRestartStrategy(RestartStrategies.noRestart());
566+
final DataStream<Event> source = env.fromData(createSourceEvents(), new EventTypeInfo());
567+
Map<String, String> config = new HashMap<>();
568+
config.put(
569+
KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING.key(),
570+
"default_namespace.default_schema_copy.\\.*:test_topic_mapping_copy;default_namespace.default_schema.\\.*:test_topic_mapping");
571+
Properties properties = getKafkaClientConfiguration();
572+
properties.forEach(
573+
(key, value) ->
574+
config.put(
575+
KafkaDataSinkOptions.PROPERTIES_PREFIX + key.toString(),
576+
value.toString()));
577+
source.sinkTo(
578+
((FlinkSinkProvider)
579+
(new KafkaDataSinkFactory()
580+
.createDataSink(
581+
new FactoryHelper.DefaultContext(
582+
Configuration.fromMap(config),
583+
Configuration.fromMap(new HashMap<>()),
584+
this.getClass().getClassLoader()))
585+
.getEventSinkProvider()))
586+
.getSink());
587+
env.execute();
588+
589+
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
590+
drainAllRecordsFromTopic("test_topic_mapping", false, 0);
591+
final long recordsCount = 5;
592+
assertThat(recordsCount).isEqualTo(collectedRecords.size());
593+
ObjectMapper mapper =
594+
JacksonMapperFactory.createObjectMapper()
595+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
596+
List<JsonNode> expected =
597+
Arrays.asList(
598+
mapper.readTree(
599+
String.format(
600+
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
601+
table1.getTableName())),
602+
mapper.readTree(
603+
String.format(
604+
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
605+
table1.getTableName())),
606+
mapper.readTree(
607+
String.format(
608+
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
609+
table1.getTableName())),
610+
mapper.readTree(
611+
String.format(
612+
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
613+
table1.getTableName())),
614+
mapper.readTree(
615+
String.format(
616+
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
617+
table1.getTableName())));
618+
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
619+
checkProducerLeak();
620+
}
621+
563622
private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
564623
String topic, boolean committed, int... partitionArr) {
565624
Properties properties = getKafkaClientConfiguration();

0 commit comments

Comments
 (0)