Skip to content

Commit d444048

Browse files
committed
support sync table coments in binlog phase
1 parent 947dffc commit d444048

File tree

2 files changed

+86
-0
lines changed

2 files changed

+86
-0
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java

+16
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
122122
if (tableEditor.hasPrimaryKey()) {
123123
builder.primaryKey(tableEditor.primaryKeyColumnNames());
124124
}
125+
builder.comment(tableEditor.create().comment());
125126
changes.add(
126127
new CreateTableEvent(
127128
toCdcTableId(tableEditor.tableId()), builder.build()));
@@ -413,6 +414,21 @@ public void exitDropTable(MySqlParser.DropTableContext ctx) {
413414
super.exitDropTable(ctx);
414415
}
415416

417+
@Override
418+
public void enterTableOptionComment(MySqlParser.TableOptionCommentContext ctx) {
419+
if (!parser.skipComments()) {
420+
parser.runIfNotNull(
421+
() -> {
422+
if (ctx.COMMENT() != null) {
423+
tableEditor.setComment(
424+
parser.withoutQuotes(ctx.STRING_LITERAL().getText()));
425+
}
426+
},
427+
tableEditor);
428+
}
429+
super.enterTableOptionComment(ctx);
430+
}
431+
416432
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
417433
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
418434
dbzColumn.name(),

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

+70
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
8080
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
8181
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
82+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
8283
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
8384
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
8485
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
@@ -1076,6 +1077,75 @@ public void testIncludeComments() throws Exception {
10761077
actual.stream().map(Object::toString).collect(Collectors.toList()));
10771078
}
10781079

1080+
@Test
1081+
public void testIncludeCommentsForScanBinlogNewlyAddedTableEnabled() throws Exception {
1082+
env.setParallelism(1);
1083+
inventoryDatabase.createAndInitialize();
1084+
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products");
1085+
TableId newTableId =
1086+
TableId.tableId(inventoryDatabase.getDatabaseName(), "products_with_comments2");
1087+
1088+
Map<String, String> options = new HashMap<>();
1089+
options.put(HOSTNAME.key(), MYSQL8_CONTAINER.getHost());
1090+
options.put(PORT.key(), String.valueOf(MYSQL8_CONTAINER.getDatabasePort()));
1091+
options.put(USERNAME.key(), TEST_USER);
1092+
options.put(PASSWORD.key(), TEST_PASSWORD);
1093+
options.put(SERVER_TIME_ZONE.key(), "UTC");
1094+
options.put(INCLUDE_COMMENTS_ENABLED.key(), "true");
1095+
options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
1096+
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".products\\.*");
1097+
Factory.Context context =
1098+
new FactoryHelper.DefaultContext(
1099+
Configuration.fromMap(options), null, this.getClass().getClassLoader());
1100+
1101+
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
1102+
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
1103+
FlinkSourceProvider sourceProvider =
1104+
(FlinkSourceProvider) dataSource.getEventSourceProvider();
1105+
1106+
CloseableIterator<Event> events =
1107+
env.fromSource(
1108+
sourceProvider.getSource(),
1109+
WatermarkStrategy.noWatermarks(),
1110+
MySqlDataSourceFactory.IDENTIFIER,
1111+
new EventTypeInfo())
1112+
.executeAndCollect();
1113+
Thread.sleep(5_000);
1114+
1115+
String createTableSql =
1116+
String.format(
1117+
"CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
1118+
+ " id INTEGER NOT NULL AUTO_INCREMENT COMMENT 'column comment of id' PRIMARY KEY,\n"
1119+
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink' COMMENT 'column comment of name',\n"
1120+
+ " weight FLOAT COMMENT 'column comment of weight'\n"
1121+
+ ")\n"
1122+
+ "COMMENT 'table comment of products';",
1123+
inventoryDatabase.getDatabaseName(), "products_with_comments2");
1124+
executeSql(inventoryDatabase, createTableSql);
1125+
1126+
// add some column
1127+
String addColumnSql =
1128+
String.format(
1129+
"ALTER TABLE `%s`.`products_with_comments2` ADD COLUMN `description` VARCHAR(512) comment 'column comment of description';",
1130+
inventoryDatabase.getDatabaseName());
1131+
executeSql(inventoryDatabase, addColumnSql);
1132+
1133+
List<Event> expectedEvents = new ArrayList<>();
1134+
CreateTableEvent productCreateTableEvent = getProductsCreateTableEvent(tableId);
1135+
expectedEvents.add(productCreateTableEvent);
1136+
// generate snapshot data
1137+
List<Event> productExpectedSnapshot = getSnapshotExpected(tableId);
1138+
expectedEvents.addAll(productExpectedSnapshot);
1139+
1140+
List<Event> newTableExpectedEvents = getEventsWithComments(newTableId);
1141+
expectedEvents.addAll(newTableExpectedEvents);
1142+
1143+
List<Event> actual = fetchResults(events, expectedEvents.size());
1144+
assertEqualsInAnyOrder(
1145+
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
1146+
actual.stream().map(Object::toString).collect(Collectors.toList()));
1147+
}
1148+
10791149
private void executeSql(UniqueDatabase database, String sql) throws SQLException {
10801150
try (Connection connection = database.getJdbcConnection();
10811151
Statement statement = connection.createStatement()) {

0 commit comments

Comments
 (0)