Skip to content

Commit c958daf

Browse files
authored
[FLINK-35540][cdc-common] Fix table missed when database and table are with the same name
This closes #3396.
1 parent 7d35f3c commit c958daf

File tree

4 files changed

+91
-11
lines changed

4 files changed

+91
-11
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,15 @@ public SelectorsBuilder includeTables(String tableInclusions) {
9898
Predicates.setOf(
9999
tableInclusions, Predicates.RegExSplitterByComma::split, (str) -> str);
100100
for (String tableSplit : tableSplitSet) {
101-
Set<String> tableIdSet =
102-
Predicates.setOf(
101+
List<String> tableIdList =
102+
Predicates.listOf(
103103
tableSplit, Predicates.RegExSplitterByDot::split, (str) -> str);
104-
Iterator<String> iterator = tableIdSet.iterator();
105-
if (tableIdSet.size() == 1) {
104+
Iterator<String> iterator = tableIdList.iterator();
105+
if (tableIdList.size() == 1) {
106106
selectors.add(new Selector(null, null, iterator.next()));
107-
} else if (tableIdSet.size() == 2) {
107+
} else if (tableIdList.size() == 2) {
108108
selectors.add(new Selector(null, iterator.next(), iterator.next()));
109-
} else if (tableIdSet.size() == 3) {
109+
} else if (tableIdList.size() == 3) {
110110
selectors.add(new Selector(iterator.next(), iterator.next(), iterator.next()));
111111
} else {
112112
throw new IllegalArgumentException(

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/Predicates.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Collection;
2525
import java.util.Collections;
2626
import java.util.LinkedHashSet;
27+
import java.util.LinkedList;
2728
import java.util.List;
2829
import java.util.Optional;
2930
import java.util.Set;
@@ -78,6 +79,21 @@ public static <T> Set<T> setOf(
7879
return matches;
7980
}
8081

82+
public static <T> List<T> listOf(
83+
String input, Function<String, String[]> splitter, Function<String, T> factory) {
84+
if (input == null) {
85+
return Collections.emptyList();
86+
}
87+
List<T> matches = new LinkedList<>();
88+
for (String item : splitter.apply(input)) {
89+
T obj = factory.apply(item);
90+
if (obj != null) {
91+
matches.add(obj);
92+
}
93+
}
94+
return matches;
95+
}
96+
8197
protected static <T> Function<T, Optional<Pattern>> matchedByPattern(
8298
Collection<Pattern> patterns, Function<T, String> conversion) {
8399
return (t) -> {

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/schema/SelectorsTest.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ public void testTableSelector() {
3232
// nameSpace, schemaName, tableName
3333
Selectors selectors =
3434
new Selectors.SelectorsBuilder()
35-
.includeTables("db.sc1.A[0-9]+,db.sc2.B[0-1]+")
35+
.includeTables("db.sc1.A[0-9]+,db.sc2.B[0-1]+,db.sc1.sc1")
3636
.build();
3737

38+
assertAllowed(selectors, "db", "sc1", "sc1");
3839
assertAllowed(selectors, "db", "sc1", "A1");
3940
assertAllowed(selectors, "db", "sc1", "A2");
4041
assertAllowed(selectors, "db", "sc2", "B0");
@@ -50,9 +51,12 @@ public void testTableSelector() {
5051

5152
selectors =
5253
new Selectors.SelectorsBuilder()
53-
.includeTables("db\\..sc1.A[0-9]+,db.sc2.B[0-1]+")
54+
.includeTables("db\\..sc1.A[0-9]+,db.sc2.B[0-1]+,db\\..sc1.sc1,db.sc1.sc1")
5455
.build();
5556

57+
assertAllowed(selectors, "db", "sc1", "sc1");
58+
assertAllowed(selectors, "db1", "sc1", "sc1");
59+
assertAllowed(selectors, "dba", "sc1", "sc1");
5660
assertAllowed(selectors, "db1", "sc1", "A1");
5761
assertAllowed(selectors, "dba", "sc1", "A2");
5862
assertAllowed(selectors, "db", "sc2", "B0");
@@ -68,8 +72,11 @@ public void testTableSelector() {
6872

6973
// schemaName, tableName
7074
selectors =
71-
new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build();
75+
new Selectors.SelectorsBuilder()
76+
.includeTables("sc1.A[0-9]+,sc2.B[0-1]+,sc1.sc1")
77+
.build();
7278

79+
assertAllowed(selectors, null, "sc1", "sc1");
7380
assertAllowed(selectors, null, "sc1", "A1");
7481
assertAllowed(selectors, null, "sc1", "A2");
7582
assertAllowed(selectors, null, "sc2", "B0");
@@ -82,8 +89,12 @@ public void testTableSelector() {
8289
assertNotAllowed(selectors, null, "sc1A", "A1");
8390

8491
// tableName
85-
selectors = new Selectors.SelectorsBuilder().includeTables("\\.A[0-9]+,B[0-1]+").build();
92+
selectors =
93+
new Selectors.SelectorsBuilder().includeTables("\\.A[0-9]+,B[0-1]+,sc1").build();
8694

95+
assertAllowed(selectors, null, null, "sc1");
96+
assertNotAllowed(selectors, "db", "sc1", "sc1");
97+
assertNotAllowed(selectors, null, "sc1", "sc1");
8798
assertAllowed(selectors, null, null, "1A1");
8899
assertAllowed(selectors, null, null, "AA2");
89100
assertAllowed(selectors, null, null, "B0");
@@ -94,8 +105,11 @@ public void testTableSelector() {
94105
assertNotAllowed(selectors, null, null, "2B");
95106

96107
selectors =
97-
new Selectors.SelectorsBuilder().includeTables("sc1.A[0-9]+,sc2.B[0-1]+").build();
108+
new Selectors.SelectorsBuilder()
109+
.includeTables("sc1.A[0-9]+,sc2.B[0-1]+,sc1.sc1")
110+
.build();
98111

112+
assertAllowed(selectors, null, "sc1", "sc1");
99113
assertAllowed(selectors, null, "sc1", "A1");
100114
assertAllowed(selectors, null, "sc1", "A2");
101115
assertAllowed(selectors, null, "sc1", "A2");
@@ -107,6 +121,15 @@ public void testTableSelector() {
107121
assertNotAllowed(selectors, null, "sc2", "B2");
108122
assertNotAllowed(selectors, null, "sc11", "A1");
109123
assertNotAllowed(selectors, null, "sc1A", "A1");
124+
125+
selectors = new Selectors.SelectorsBuilder().includeTables("sc1.sc1").build();
126+
assertAllowed(selectors, null, "sc1", "sc1");
127+
128+
selectors = new Selectors.SelectorsBuilder().includeTables("sc1.sc[0-9]+").build();
129+
assertAllowed(selectors, null, "sc1", "sc1");
130+
131+
selectors = new Selectors.SelectorsBuilder().includeTables("sc1.\\.*").build();
132+
assertAllowed(selectors, null, "sc1", "sc1");
110133
}
111134

112135
protected void assertAllowed(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
import org.junit.Test;
2626

27+
import java.sql.Connection;
28+
import java.sql.SQLException;
29+
import java.sql.Statement;
2730
import java.util.Arrays;
2831
import java.util.HashMap;
2932
import java.util.Map;
@@ -124,6 +127,44 @@ public void testExcludeAllTable() {
124127
+ tableExclude);
125128
}
126129

130+
@Test
131+
public void testDatabaseAndTableWithTheSameName() throws SQLException {
132+
inventoryDatabase.createAndInitialize();
133+
// create a table with the same name of database
134+
try (Connection connection = inventoryDatabase.getJdbcConnection();
135+
Statement statement = connection.createStatement()) {
136+
String createSameNameTableSql =
137+
String.format(
138+
"CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
139+
+ " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n"
140+
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
141+
+ " description VARCHAR(512)\n"
142+
+ ");",
143+
inventoryDatabase.getDatabaseName(),
144+
inventoryDatabase.getDatabaseName());
145+
146+
statement.execute(createSameNameTableSql);
147+
}
148+
Map<String, String> options = new HashMap<>();
149+
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
150+
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
151+
options.put(USERNAME.key(), TEST_USER);
152+
options.put(PASSWORD.key(), TEST_PASSWORD);
153+
options.put(
154+
TABLES.key(),
155+
inventoryDatabase.getDatabaseName() + "." + inventoryDatabase.getDatabaseName());
156+
Factory.Context context = new MockContext(Configuration.fromMap(options));
157+
158+
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
159+
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
160+
assertThat(dataSource.getSourceConfig().getTableList())
161+
.isEqualTo(
162+
Arrays.asList(
163+
inventoryDatabase.getDatabaseName()
164+
+ "."
165+
+ inventoryDatabase.getDatabaseName()));
166+
}
167+
127168
class MockContext implements Factory.Context {
128169

129170
Configuration factoryConfiguration;

0 commit comments

Comments
 (0)