Skip to content

Commit cadab02

Browse files
committed
update upsert kafka source as well
1 parent 1d93244 commit cadab02

File tree

4 files changed

+37
-15
lines changed

4 files changed

+37
-15
lines changed

docs/content.zh/docs/connectors/table/upsert-kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ of all available metadata fields.
119119
<td>必选</td>
120120
<td style="word-wrap: break-word;">(none)</td>
121121
<td>String</td>
122-
<td>用于读取和写入的 Kafka topic 名称。</td>
122+
<td>当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 列表,如 <code>'topic-1;topic-2'</code> 来作为 source 的 topic 列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink 的 topic 列表。提供的 topic 列表被视为 `topic` 元数据列的有效值的允许列表。如果提供了列表,对于 sink 表,“topic”元数据列是可写的并且必须指定。</td>
123123
</tr>
124124
<tr>
125125
<td><h5>properties.bootstrap.servers</h5></td>

docs/content/docs/connectors/table/upsert-kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ Connector Options
129129
<td>required</td>
130130
<td style="word-wrap: break-word;">(none)</td>
131131
<td>String</td>
132-
<td>The Kafka topic name to read from and write to.</td>
132+
<td>Topic name(s) to read data from when the table is used as source, or topics for writing when the table is used as sink. It also supports topic list for source by separating topic by semicolon like <code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified. For sinks, the topic name is the topic to write data. It also supports topic list for sinks. The provided topic-list is treated as a allow list of valid values for the `topic` metadata column. If a list is provided, for sink table, 'topic' metadata column is writable and must be specified.</td>
133133
</tr>
134134
<tr>
135135
<td><h5>properties.bootstrap.servers</h5></td>

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,6 @@ private static void validateSource(
250250
Format keyFormat,
251251
Format valueFormat,
252252
int[] primaryKeyIndexes) {
253-
validateTopic(tableOptions);
254253
validateScanBoundedMode(tableOptions);
255254
validateFormat(keyFormat, valueFormat, tableOptions);
256255
validatePKConstraints(primaryKeyIndexes);
@@ -266,15 +265,6 @@ private static void validateSink(
266265
validateSinkBufferFlush(tableOptions);
267266
}
268267

269-
private static void validateTopic(ReadableConfig tableOptions) {
270-
List<String> topic = tableOptions.get(TOPIC);
271-
if (topic.size() > 1) {
272-
throw new ValidationException(
273-
"The 'upsert-kafka' connector doesn't support topic list now. "
274-
+ "Please use single topic as the value of the parameter 'topic'.");
275-
}
276-
}
277-
278268
private static void validateFormat(
279269
Format keyFormat, Format valueFormat, ReadableConfig tableOptions) {
280270
if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,39 @@ public void testTableSource() {
167167
SOURCE_KEY_FIELDS,
168168
SOURCE_VALUE_FIELDS,
169169
null,
170-
SOURCE_TOPIC,
170+
Collections.singletonList(SOURCE_TOPIC),
171+
UPSERT_KAFKA_SOURCE_PROPERTIES);
172+
assertThat(actualSource).isEqualTo(expectedSource);
173+
174+
final KafkaDynamicSource actualUpsertKafkaSource = (KafkaDynamicSource) actualSource;
175+
ScanTableSource.ScanRuntimeProvider provider =
176+
actualUpsertKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
177+
assertKafkaSource(provider);
178+
}
179+
180+
@Test
181+
public void testTableSourceWithTopicList() {
182+
final Map<String, String> modifiedOptions =
183+
getModifiedOptions(
184+
getFullSourceOptions(),
185+
options -> {
186+
options.put(
187+
"topic", String.format("%s;%s", SOURCE_TOPIC, SOURCE_TOPIC));
188+
});
189+
final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
190+
// Construct table source using options and table source factory
191+
final DynamicTableSource actualSource =
192+
createTableSource(SOURCE_SCHEMA, modifiedOptions);
193+
194+
final KafkaDynamicSource expectedSource =
195+
createExpectedScanSource(
196+
producedDataType,
197+
keyDecodingFormat,
198+
valueDecodingFormat,
199+
SOURCE_KEY_FIELDS,
200+
SOURCE_VALUE_FIELDS,
201+
null,
202+
Arrays.asList(SOURCE_TOPIC, SOURCE_TOPIC),
171203
UPSERT_KAFKA_SOURCE_PROPERTIES);
172204
assertThat(actualSource).isEqualTo(expectedSource);
173205

@@ -819,7 +851,7 @@ private KafkaDynamicSource createExpectedScanSource(
819851
int[] keyFields,
820852
int[] valueFields,
821853
String keyPrefix,
822-
String topic,
854+
List<String> topic,
823855
Properties properties) {
824856
return new KafkaDynamicSource(
825857
producedDataType,
@@ -828,7 +860,7 @@ private KafkaDynamicSource createExpectedScanSource(
828860
keyFields,
829861
valueFields,
830862
keyPrefix,
831-
Collections.singletonList(topic),
863+
topic,
832864
null,
833865
properties,
834866
StartupMode.EARLIEST,

0 commit comments

Comments
 (0)