Skip to content

Commit b6fe5e9

Browse files
committed
Check individual topics in ITCase
1 parent cadab02 commit b6fe5e9

File tree

2 files changed

+47
-20
lines changed

2 files changed

+47
-20
lines changed

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -200,29 +200,51 @@ public void testKafkaSourceSinkWithTopicList() throws Exception {
200200
// ---------- Produce an event time stream into Kafka -------------------
201201
String groupId = getStandardProps().getProperty("group.id");
202202
String bootstraps = getBootstrapServers();
203-
final String createTable =
203+
final String createTableTemplate =
204+
"CREATE TABLE %s (\n"
205+
+ " `topic` STRING METADATA,\n"
206+
+ " `user_id` INT,\n"
207+
+ " `item_id` INT,\n"
208+
+ " `behavior` STRING\n"
209+
+ ") WITH (\n"
210+
+ " 'connector' = '%s',\n"
211+
+ " 'topic' = '%s',\n"
212+
+ " 'properties.bootstrap.servers' = '%s',\n"
213+
+ " 'properties.group.id' = '%s',\n"
214+
+ " 'scan.startup.mode' = 'earliest-offset',\n"
215+
+ " %s\n"
216+
+ ")\n";
217+
final String createTopicListTable =
204218
String.format(
205-
"CREATE TABLE kafka (\n"
206-
+ " `topic` STRING METADATA,\n"
207-
+ " `user_id` INT,\n"
208-
+ " `item_id` INT,\n"
209-
+ " `behavior` STRING\n"
210-
+ ") WITH (\n"
211-
+ " 'connector' = '%s',\n"
212-
+ " 'topic' = '%s;%s',\n"
213-
+ " 'properties.bootstrap.servers' = '%s',\n"
214-
+ " 'properties.group.id' = '%s',\n"
215-
+ " 'scan.startup.mode' = 'earliest-offset',\n"
216-
+ " %s\n"
217-
+ ")\n",
219+
"kafka",
220+
createTableTemplate,
221+
KafkaDynamicTableFactory.IDENTIFIER,
222+
String.join(";", Arrays.asList(topic1, topic2)),
223+
bootstraps,
224+
groupId,
225+
formatOptions());
226+
final String createTopic1Table =
227+
String.format(
228+
"topic1",
229+
createTableTemplate,
218230
KafkaDynamicTableFactory.IDENTIFIER,
219231
topic1,
232+
bootstraps,
233+
groupId,
234+
formatOptions());
235+
final String createTopic2Table =
236+
String.format(
237+
"topic2",
238+
createTableTemplate,
239+
KafkaDynamicTableFactory.IDENTIFIER,
220240
topic2,
221241
bootstraps,
222242
groupId,
223243
formatOptions());
224244

225-
tEnv.executeSql(createTable);
245+
tEnv.executeSql(createTopicListTable);
246+
tEnv.executeSql(createTopic1Table);
247+
tEnv.executeSql(createTopic2Table);
226248

227249
List<Row> values =
228250
Arrays.asList(
@@ -233,14 +255,20 @@ public void testKafkaSourceSinkWithTopicList() throws Exception {
233255
// ---------- Consume stream from Kafka -------------------
234256

235257
List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from kafka"));
236-
258+
List<Row> topic1Results = collectAllRows(tEnv.sqlQuery("SELECT * from topic1"));
259+
List<Row> topic2Results = collectAllRows(tEnv.sqlQuery("SELECT * from topic2"));
237260
assertThat(results)
238-
.containsExactly(
261+
.containsExactlyInAnyOrder(
239262
Row.of(topic1, 1, 1102, "behavior 1"),
240263
Row.of(topic2, 2, 1103, "behavior 2"));
264+
assertThat(topic1Results)
265+
.containsExactly(
266+
Row.of(topic1, 1, 1102, "behavior 1"));
267+
assertThat(topic2Results)
268+
.containsExactly(
269+
Row.of(topic2, 2, 1103, "behavior 2"));
241270

242271
// ------------- cleanup -------------------
243-
244272
deleteTestTopic(topic1);
245273
deleteTestTopic(topic2);
246274
}

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,7 @@ public void testTableSourceWithTopicList() {
188188
});
189189
final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
190190
// Construct table source using options and table source factory
191-
final DynamicTableSource actualSource =
192-
createTableSource(SOURCE_SCHEMA, modifiedOptions);
191+
final DynamicTableSource actualSource = createTableSource(SOURCE_SCHEMA, modifiedOptions);
193192

194193
final KafkaDynamicSource expectedSource =
195194
createExpectedScanSource(

0 commit comments

Comments
 (0)