Skip to content

Commit 3d4f4bb

Browse files
authored
[Fix] [Kafka Source] kafka source use topic as table name instead of fullName (#8401)
1 parent d1d9113 commit 3d4f4bb

File tree

16 files changed

+158
-48
lines changed

16 files changed

+158
-48
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java

+16-14
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class MultiTableSink
5353
MultiTableAggregatedCommitInfo>,
5454
SupportSchemaEvolutionSink {
5555

56-
@Getter private final Map<String, SeaTunnelSink> sinks;
56+
@Getter private final Map<TablePath, SeaTunnelSink> sinks;
5757
private final int replicaNum;
5858

5959
public MultiTableSink(MultiTableFactoryContext context) {
@@ -72,9 +72,10 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> createWri
7272
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new HashMap<>();
7373
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new HashMap<>();
7474
for (int i = 0; i < replicaNum; i++) {
75-
for (String tableIdentifier : sinks.keySet()) {
76-
SeaTunnelSink sink = sinks.get(tableIdentifier);
75+
for (TablePath tablePath : sinks.keySet()) {
76+
SeaTunnelSink sink = sinks.get(tablePath);
7777
int index = context.getIndexOfSubtask() * replicaNum + i;
78+
String tableIdentifier = tablePath.toString();
7879
writers.put(
7980
SinkIdentifier.of(tableIdentifier, index),
8081
sink.createWriter(new SinkContextProxy(index, replicaNum, context)));
@@ -91,10 +92,10 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
9192
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new HashMap<>();
9293

9394
for (int i = 0; i < replicaNum; i++) {
94-
for (String tableIdentifier : sinks.keySet()) {
95-
SeaTunnelSink sink = sinks.get(tableIdentifier);
95+
for (TablePath tablePath : sinks.keySet()) {
96+
SeaTunnelSink sink = sinks.get(tablePath);
9697
int index = context.getIndexOfSubtask() * replicaNum + i;
97-
SinkIdentifier sinkIdentifier = SinkIdentifier.of(tableIdentifier, index);
98+
SinkIdentifier sinkIdentifier = SinkIdentifier.of(tablePath.toString(), index);
9899
List<?> state =
99100
states.stream()
100101
.map(
@@ -113,7 +114,7 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
113114
sink.restoreWriter(
114115
new SinkContextProxy(index, replicaNum, context), state));
115116
}
116-
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context);
117+
sinkWritersContext.put(sinkIdentifier, context);
117118
}
118119
}
119120
return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext);
@@ -127,12 +128,13 @@ public Optional<Serializer<MultiTableState>> getWriterStateSerializer() {
127128
@Override
128129
public Optional<SinkCommitter<MultiTableCommitInfo>> createCommitter() throws IOException {
129130
Map<String, SinkCommitter<?>> committers = new HashMap<>();
130-
for (String tableIdentifier : sinks.keySet()) {
131-
SeaTunnelSink sink = sinks.get(tableIdentifier);
131+
for (TablePath tablePath : sinks.keySet()) {
132+
SeaTunnelSink sink = sinks.get(tablePath);
132133
sink.createCommitter()
133134
.ifPresent(
134135
committer ->
135-
committers.put(tableIdentifier, (SinkCommitter<?>) committer));
136+
committers.put(
137+
tablePath.toString(), (SinkCommitter<?>) committer));
136138
}
137139
if (committers.isEmpty()) {
138140
return Optional.empty();
@@ -149,12 +151,12 @@ public Optional<Serializer<MultiTableCommitInfo>> getCommitInfoSerializer() {
149151
public Optional<SinkAggregatedCommitter<MultiTableCommitInfo, MultiTableAggregatedCommitInfo>>
150152
createAggregatedCommitter() throws IOException {
151153
Map<String, SinkAggregatedCommitter<?, ?>> aggCommitters = new HashMap<>();
152-
for (String tableIdentifier : sinks.keySet()) {
153-
SeaTunnelSink sink = sinks.get(tableIdentifier);
154+
for (TablePath tablePath : sinks.keySet()) {
155+
SeaTunnelSink sink = sinks.get(tablePath);
154156
Optional<SinkAggregatedCommitter<?, ?>> sinkOptional = sink.createAggregatedCommitter();
155157
sinkOptional.ifPresent(
156158
sinkAggregatedCommitter ->
157-
aggCommitters.put(tableIdentifier, sinkAggregatedCommitter));
159+
aggCommitters.put(tablePath.toString(), sinkAggregatedCommitter));
158160
}
159161
if (aggCommitters.isEmpty()) {
160162
return Optional.empty();
@@ -171,7 +173,7 @@ public List<TablePath> getSinkTables() {
171173
tablePaths.add(
172174
((CatalogTable) values.get(i).getWriteCatalogTable().get()).getTablePath());
173175
} else {
174-
tablePaths.add(TablePath.of(sinks.keySet().toArray(new String[0])[i]));
176+
tablePaths.add(sinks.keySet().toArray(new TablePath[0])[i]);
175177
}
176178
}
177179
return tablePaths;

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.seatunnel.api.table.catalog.CatalogTable;
3333
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
3434
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
35+
import org.apache.seatunnel.api.table.catalog.TablePath;
3536
import org.apache.seatunnel.api.table.connector.TableSource;
3637
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3738
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -178,7 +179,7 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSi
178179

179180
public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
180181
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createMultiTableSink(
181-
Map<String, SeaTunnelSink> sinks,
182+
Map<TablePath, SeaTunnelSink> sinks,
182183
ReadonlyConfig options,
183184
ClassLoader classLoader) {
184185
try {

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.sink.SeaTunnelSink;
22+
import org.apache.seatunnel.api.table.catalog.TablePath;
2223

2324
import lombok.Getter;
2425

@@ -27,10 +28,10 @@
2728
@Getter
2829
public class MultiTableFactoryContext extends TableSinkFactoryContext {
2930

30-
private final Map<String, SeaTunnelSink> sinks;
31+
private final Map<TablePath, SeaTunnelSink> sinks;
3132

3233
public MultiTableFactoryContext(
33-
ReadonlyConfig options, ClassLoader classLoader, Map<String, SeaTunnelSink> sinks) {
34+
ReadonlyConfig options, ClassLoader classLoader, Map<TablePath, SeaTunnelSink> sinks) {
3435
super(null, options, classLoader);
3536
this.sinks = sinks;
3637
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private Map<TablePath, ConsumerMetadata> createMapConsumerMetadata(
137137
return consumerMetadataList.stream()
138138
.collect(
139139
Collectors.toMap(
140-
consumerMetadata -> TablePath.of(consumerMetadata.getTopic()),
140+
consumerMetadata -> TablePath.of(null, consumerMetadata.getTopic()),
141141
consumerMetadata -> consumerMetadata));
142142
}
143143

@@ -208,7 +208,7 @@ private ConsumerMetadata createConsumerMetadata(ReadonlyConfig readonlyConfig) {
208208
private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
209209
Optional<Map<String, Object>> schemaOptions =
210210
readonlyConfig.getOptional(TableSchemaOptions.SCHEMA);
211-
TablePath tablePath = TablePath.of(readonlyConfig.get(TOPIC));
211+
TablePath tablePath = TablePath.of(null, readonlyConfig.get(TOPIC));
212212
TableSchema tableSchema;
213213
if (schemaOptions.isPresent()) {
214214
tableSchema = new ReadonlyConfigParser().parse(readonlyConfig);

seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.seatunnel.api.source.SeaTunnelSource;
2929
import org.apache.seatunnel.api.table.catalog.CatalogTable;
3030
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
31+
import org.apache.seatunnel.api.table.catalog.TablePath;
3132
import org.apache.seatunnel.api.table.connector.TableSource;
3233
import org.apache.seatunnel.api.table.factory.Factory;
3334
import org.apache.seatunnel.api.table.factory.FactoryException;
@@ -184,7 +185,7 @@ public static SeaTunnelSink createSink(
184185
return sink;
185186
} else {
186187
if (catalogTables.size() > 1) {
187-
Map<String, SeaTunnelSink> sinks = new HashMap<>();
188+
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
188189
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
189190
catalogTables.forEach(
190191
catalogTable -> {
@@ -202,7 +203,7 @@ public static SeaTunnelSink createSink(
202203
.createSink(context)
203204
.createSink();
204205
action.setJobContext(jobContext);
205-
sinks.put(catalogTable.getTablePath().toString(), action);
206+
sinks.put(catalogTable.getTablePath(), action);
206207
});
207208
return FactoryUtil.createMultiTableSink(sinks, readonlyConfig, classLoader);
208209
}

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.seatunnel.api.sink.SupportSaveMode;
3131
import org.apache.seatunnel.api.table.catalog.CatalogTable;
3232
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
33+
import org.apache.seatunnel.api.table.catalog.TablePath;
3334
import org.apache.seatunnel.api.table.factory.Factory;
3435
import org.apache.seatunnel.api.table.factory.FactoryUtil;
3536
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -106,7 +107,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
106107
fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input);
107108
Optional<? extends Factory> factory = plugins.get(i);
108109
boolean fallBack = !factory.isPresent() || isFallback(factory.get());
109-
Map<String, SeaTunnelSink> sinks = new HashMap<>();
110+
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
110111
if (fallBack) {
111112
for (CatalogTable catalogTable : stream.getCatalogTables()) {
112113
SeaTunnelSink fallBackSink =
@@ -122,8 +123,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
122123
fallBackSink.setTypeInfo(sourceType);
123124
handleSaveMode(fallBackSink);
124125
TableIdentifier tableId = catalogTable.getTableId();
125-
String tableIdName = tableId.toTablePath().toString();
126-
sinks.put(tableIdName, fallBackSink);
126+
sinks.put(tableId.toTablePath(), fallBackSink);
127127
}
128128
} else {
129129
for (CatalogTable catalogTable : stream.getCatalogTables()) {
@@ -141,8 +141,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
141141
seaTunnelSink.setJobContext(jobContext);
142142
handleSaveMode(seaTunnelSink);
143143
TableIdentifier tableId = catalogTable.getTableId();
144-
String tableIdName = tableId.toTablePath().toString();
145-
sinks.put(tableIdName, seaTunnelSink);
144+
sinks.put(tableId.toTablePath(), seaTunnelSink);
146145
}
147146
}
148147
SeaTunnelSink sink =
@@ -168,7 +167,9 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
168167

169168
// if not support multi table, rollback
170169
public SeaTunnelSink tryGenerateMultiTableSink(
171-
Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, ClassLoader classLoader) {
170+
Map<TablePath, SeaTunnelSink> sinks,
171+
ReadonlyConfig sinkConfig,
172+
ClassLoader classLoader) {
172173
if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) {
173174
log.info("Unsupported multi table sink api, rollback to sink template");
174175
// choose the first sink

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.seatunnel.api.sink.SupportSaveMode;
3131
import org.apache.seatunnel.api.table.catalog.CatalogTable;
3232
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
33+
import org.apache.seatunnel.api.table.catalog.TablePath;
3334
import org.apache.seatunnel.api.table.factory.Factory;
3435
import org.apache.seatunnel.api.table.factory.FactoryUtil;
3536
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -107,7 +108,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
107108
fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input);
108109
Optional<? extends Factory> factory = plugins.get(i);
109110
boolean fallBack = !factory.isPresent() || isFallback(factory.get());
110-
Map<String, SeaTunnelSink> sinks = new HashMap<>();
111+
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
111112
if (fallBack) {
112113
for (CatalogTable catalogTable : stream.getCatalogTables()) {
113114
SeaTunnelSink fallBackSink =
@@ -123,8 +124,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
123124
fallBackSink.setTypeInfo(sourceType);
124125
handleSaveMode(fallBackSink);
125126
TableIdentifier tableId = catalogTable.getTableId();
126-
String tableIdName = tableId.toTablePath().toString();
127-
sinks.put(tableIdName, fallBackSink);
127+
sinks.put(tableId.toTablePath(), fallBackSink);
128128
}
129129
} else {
130130
for (CatalogTable catalogTable : stream.getCatalogTables()) {
@@ -142,8 +142,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
142142
seaTunnelSink.setJobContext(jobContext);
143143
handleSaveMode(seaTunnelSink);
144144
TableIdentifier tableId = catalogTable.getTableId();
145-
String tableIdName = tableId.toTablePath().toString();
146-
sinks.put(tableIdName, seaTunnelSink);
145+
sinks.put(tableId.toTablePath(), seaTunnelSink);
147146
}
148147
}
149148
SeaTunnelSink sink =
@@ -174,7 +173,9 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
174173

175174
// if not support multi table, rollback
176175
public SeaTunnelSink tryGenerateMultiTableSink(
177-
Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, ClassLoader classLoader) {
176+
Map<TablePath, SeaTunnelSink> sinks,
177+
ReadonlyConfig sinkConfig,
178+
ClassLoader classLoader) {
178179
if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) {
179180
log.info("Unsupported multi table sink api, rollback to sink template");
180181
// choose the first sink

seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.seatunnel.api.sink.SupportSaveMode;
2828
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
2929
import org.apache.seatunnel.api.table.catalog.CatalogTable;
30+
import org.apache.seatunnel.api.table.catalog.TablePath;
3031
import org.apache.seatunnel.api.table.factory.Factory;
3132
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
3233
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
@@ -168,7 +169,7 @@ public void handleSaveMode(SeaTunnelSink sink) {
168169
}
169170
}
170171
} else if (sink instanceof MultiTableSink) {
171-
Map<String, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
172+
Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
172173
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
173174
handleSaveMode(seaTunnelSink);
174175
}

seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.seatunnel.api.sink.SupportSaveMode;
2828
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
2929
import org.apache.seatunnel.api.table.catalog.CatalogTable;
30+
import org.apache.seatunnel.api.table.catalog.TablePath;
3031
import org.apache.seatunnel.api.table.factory.Factory;
3132
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
3233
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
@@ -169,7 +170,7 @@ public void handleSaveMode(SeaTunnelSink sink) {
169170
}
170171
}
171172
} else if (sink instanceof MultiTableSink) {
172-
Map<String, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
173+
Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
173174
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
174175
handleSaveMode(seaTunnelSink);
175176
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,26 @@ public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer contain
269269
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
270270
}
271271

272+
@TestTemplate
273+
public void testSourceKafkaTopicWithMultipleDotConsoleAssertCatalogTable(
274+
TestContainer container) throws IOException, InterruptedException {
275+
TextSerializationSchema serializer =
276+
TextSerializationSchema.builder()
277+
.seaTunnelRowType(SEATUNNEL_ROW_TYPE)
278+
.delimiter(",")
279+
.build();
280+
generateTestData(
281+
row ->
282+
new ProducerRecord<>(
283+
"test.multiple.point.topic.json", null, serializer.serialize(row)),
284+
0,
285+
10);
286+
Container.ExecResult execResult =
287+
container.executeJob(
288+
"/textFormatIT/kafka_source_topic_multiple_point_text_to_console.conf");
289+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
290+
}
291+
272292
@TestTemplate
273293
public void testSourceKafkaJsonToConsole(TestContainer container)
274294
throws IOException, InterruptedException {
@@ -295,7 +315,7 @@ public void testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer c
295315
DEFAULT_FORMAT,
296316
DEFAULT_FIELD_DELIMITER,
297317
null);
298-
generateTestData(serializer::serializeRow, 0, 100);
318+
generateTestData(serializer::serializeRow, 0, 10);
299319
Container.ExecResult execResult =
300320
container.executeJob(
301321
"/kafka/kafkasource_format_error_handle_way_skip_to_console.conf");

0 commit comments

Comments
 (0)