Skip to content

Commit f61f0f4

Browse files
authored
[FLINK-35264][cdc][runtime] Fix multiple transform rules do not take effect (#3280)
1 parent 23a67dc commit f61f0f4

File tree

4 files changed

+294
-12
lines changed

4 files changed

+294
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.pipeline.tests;
19+
20+
import org.apache.flink.cdc.common.test.utils.TestUtils;
21+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
22+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
23+
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
24+
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
25+
import org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator;
26+
27+
import org.junit.After;
28+
import org.junit.Before;
29+
import org.junit.ClassRule;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
import org.junit.runners.Parameterized;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
import org.testcontainers.containers.output.Slf4jLogConsumer;
36+
37+
import java.nio.file.Path;
38+
import java.sql.Connection;
39+
import java.sql.DriverManager;
40+
import java.sql.SQLException;
41+
import java.sql.Statement;
42+
import java.time.Duration;
43+
import java.util.Arrays;
44+
import java.util.List;
45+
import java.util.concurrent.TimeoutException;
46+
47+
/** E2e tests for the {@link TransformSchemaOperator}. */
48+
@RunWith(Parameterized.class)
49+
public class TransformE2eITCase extends PipelineTestEnvironment {
50+
private static final Logger LOG = LoggerFactory.getLogger(TransformE2eITCase.class);
51+
52+
// ------------------------------------------------------------------------------------------
53+
// MySQL Variables (we always use MySQL as the data source for easier verifying)
54+
// ------------------------------------------------------------------------------------------
55+
protected static final String MYSQL_TEST_USER = "mysqluser";
56+
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
57+
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
58+
protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
59+
60+
@ClassRule
61+
public static final MySqlContainer MYSQL =
62+
(MySqlContainer)
63+
new MySqlContainer(
64+
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
65+
.withConfigurationOverride("docker/mysql/my.cnf")
66+
.withSetupSQL("docker/mysql/setup.sql")
67+
.withDatabaseName("flink-test")
68+
.withUsername("flinkuser")
69+
.withPassword("flinkpw")
70+
.withNetwork(NETWORK)
71+
.withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
72+
.withLogConsumer(new Slf4jLogConsumer(LOG));
73+
74+
protected final UniqueDatabase transformRenameDatabase =
75+
new UniqueDatabase(MYSQL, "transform_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
76+
77+
@Before
78+
public void before() throws Exception {
79+
super.before();
80+
transformRenameDatabase.createAndInitialize();
81+
}
82+
83+
@After
84+
public void after() {
85+
super.after();
86+
transformRenameDatabase.dropDatabase();
87+
}
88+
89+
@Test
90+
public void testHeteroSchemaTransform() throws Exception {
91+
String pipelineJob =
92+
String.format(
93+
"source:\n"
94+
+ " type: mysql\n"
95+
+ " hostname: %s\n"
96+
+ " port: 3306\n"
97+
+ " username: %s\n"
98+
+ " password: %s\n"
99+
+ " tables: %s.\\.*\n"
100+
+ " server-id: 5400-5404\n"
101+
+ " server-time-zone: UTC\n"
102+
+ "\n"
103+
+ "sink:\n"
104+
+ " type: values\n"
105+
+ "route:\n"
106+
+ " - source-table: %s.\\.*\n"
107+
+ " sink-table: %s.terminus\n"
108+
+ "transform:\n"
109+
+ " - source-table: %s.TABLEALPHA\n"
110+
+ " projection: ID, VERSION\n"
111+
+ " filter: ID > 1008\n"
112+
+ " - source-table: %s.TABLEBETA\n"
113+
+ " projection: ID, VERSION\n"
114+
+ "\n"
115+
+ "pipeline:\n"
116+
+ " parallelism: 1",
117+
INTER_CONTAINER_MYSQL_ALIAS,
118+
MYSQL_TEST_USER,
119+
MYSQL_TEST_PASSWORD,
120+
transformRenameDatabase.getDatabaseName(),
121+
transformRenameDatabase.getDatabaseName(),
122+
transformRenameDatabase.getDatabaseName(),
123+
transformRenameDatabase.getDatabaseName(),
124+
transformRenameDatabase.getDatabaseName());
125+
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
126+
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
127+
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
128+
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
129+
waitUntilJobRunning(Duration.ofSeconds(30));
130+
LOG.info("Pipeline job is running");
131+
waitUtilSpecificEvent(
132+
String.format(
133+
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
134+
transformRenameDatabase.getDatabaseName()),
135+
6000L);
136+
137+
waitUtilSpecificEvent(
138+
String.format(
139+
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
140+
transformRenameDatabase.getDatabaseName()),
141+
6000L);
142+
143+
List<String> expectedEvents =
144+
Arrays.asList(
145+
String.format(
146+
"CreateTableEvent{tableId=%s.terminus, schema=columns={`ID` INT NOT NULL,`VERSION` STRING}, primaryKeys=ID, options=()}",
147+
transformRenameDatabase.getDatabaseName()),
148+
String.format(
149+
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1009, 8.1], op=INSERT, meta=()}",
150+
transformRenameDatabase.getDatabaseName()),
151+
String.format(
152+
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1010, 10], op=INSERT, meta=()}",
153+
transformRenameDatabase.getDatabaseName()),
154+
String.format(
155+
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
156+
transformRenameDatabase.getDatabaseName()),
157+
String.format(
158+
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2011, 11], op=INSERT, meta=()}",
159+
transformRenameDatabase.getDatabaseName()),
160+
String.format(
161+
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2012, 12], op=INSERT, meta=()}",
162+
transformRenameDatabase.getDatabaseName()),
163+
String.format(
164+
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
165+
transformRenameDatabase.getDatabaseName()));
166+
validateResult(expectedEvents);
167+
LOG.info("Begin incremental reading stage.");
168+
// generate binlogs
169+
String mysqlJdbcUrl =
170+
String.format(
171+
"jdbc:mysql://%s:%s/%s",
172+
MYSQL.getHost(),
173+
MYSQL.getDatabasePort(),
174+
transformRenameDatabase.getDatabaseName());
175+
try (Connection conn =
176+
DriverManager.getConnection(
177+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
178+
Statement stat = conn.createStatement()) {
179+
stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
180+
stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79);");
181+
stat.execute("DELETE FROM TABLEBETA WHERE id=2011;");
182+
} catch (SQLException e) {
183+
LOG.error("Update table for CDC failed.", e);
184+
throw e;
185+
}
186+
187+
waitUtilSpecificEvent(
188+
String.format(
189+
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}",
190+
transformRenameDatabase.getDatabaseName()),
191+
6000L);
192+
193+
waitUtilSpecificEvent(
194+
String.format(
195+
"DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}",
196+
transformRenameDatabase.getDatabaseName()),
197+
6000L);
198+
199+
waitUtilSpecificEvent(
200+
String.format(
201+
"DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}",
202+
transformRenameDatabase.getDatabaseName()),
203+
6000L);
204+
205+
String stdout = taskManagerConsumer.toUtf8String();
206+
System.out.println(stdout);
207+
}
208+
209+
private void validateResult(List<String> expectedEvents) {
210+
String stdout = taskManagerConsumer.toUtf8String();
211+
for (String event : expectedEvents) {
212+
if (!stdout.contains(event)) {
213+
throw new RuntimeException(
214+
"failed to get specific event: " + event + " from stdout: " + stdout);
215+
}
216+
}
217+
}
218+
219+
private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
220+
boolean result = false;
221+
long endTimeout = System.currentTimeMillis() + timeout;
222+
while (System.currentTimeMillis() < endTimeout) {
223+
String stdout = taskManagerConsumer.toUtf8String();
224+
if (stdout.contains(event)) {
225+
result = true;
226+
break;
227+
}
228+
Thread.sleep(1000);
229+
}
230+
if (!result) {
231+
throw new TimeoutException(
232+
"failed to get specific event: "
233+
+ event
234+
+ " from stdout: "
235+
+ taskManagerConsumer.toUtf8String());
236+
}
237+
}
238+
}

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/mysql_inventory.sql

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
1-
-- Copyright 2023 Ververica Inc.
1+
-- Licensed to the Apache Software Foundation (ASF) under one or more
2+
-- contributor license agreements. See the NOTICE file distributed with
3+
-- this work for additional information regarding copyright ownership.
4+
-- The ASF licenses this file to You under the Apache License, Version 2.0
5+
-- (the "License"); you may not use this file except in compliance with
6+
-- the License. You may obtain a copy of the License at
27
--
3-
-- Licensed under the Apache License, Version 2.0 (the "License");
4-
-- you may not use this file except in compliance with the License.
5-
-- You may obtain a copy of the License at
6-
-- http://www.apache.org/licenses/LICENSE-2.0
7-
-- Unless required by applicable law or agreed to in writing,
8-
-- software distributed under the License is distributed on an
9-
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
10-
-- KIND, either express or implied. See the License for the
11-
-- specific language governing permissions and limitations
12-
-- under the License.
8+
-- http://www.apache.org/licenses/LICENSE-2.0
9+
--
10+
-- Unless required by applicable law or agreed to in writing, software
11+
-- distributed under the License is distributed on an "AS IS" BASIS,
12+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
-- See the License for the specific language governing permissions and
14+
-- limitations under the License.
1315

1416
-- ----------------------------------------------------------------------------------------------------------------
1517
-- DATABASE: mysql_inventory
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one or more
2+
-- contributor license agreements. See the NOTICE file distributed with
3+
-- this work for additional information regarding copyright ownership.
4+
-- The ASF licenses this file to You under the Apache License, Version 2.0
5+
-- (the "License"); you may not use this file except in compliance with
6+
-- the License. You may obtain a copy of the License at
7+
--
8+
-- http://www.apache.org/licenses/LICENSE-2.0
9+
--
10+
-- Unless required by applicable law or agreed to in writing, software
11+
-- distributed under the License is distributed on an "AS IS" BASIS,
12+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
-- See the License for the specific language governing permissions and
14+
-- limitations under the License.
15+
16+
DROP TABLE IF EXISTS TABLEALPHA;
17+
18+
CREATE TABLE TABLEALPHA (
19+
ID INT NOT NULL,
20+
VERSION VARCHAR(17),
21+
PRICEALPHA INT,
22+
PRIMARY KEY (ID)
23+
);
24+
25+
INSERT INTO TABLEALPHA VALUES (1008, '8', 199);
26+
INSERT INTO TABLEALPHA VALUES (1009, '8.1', 0);
27+
INSERT INTO TABLEALPHA VALUES (1010, '10', 99);
28+
INSERT INTO TABLEALPHA VALUES (1011, '11', 59);
29+
30+
DROP TABLE IF EXISTS TABLEBETA;
31+
32+
CREATE TABLE TABLEBETA (
33+
ID INT NOT NULL,
34+
VERSION VARCHAR(17),
35+
CODENAMESBETA VARCHAR(17),
36+
PRIMARY KEY (ID)
37+
);
38+
39+
INSERT INTO TABLEBETA VALUES (2011, '11', 'Big Sur');
40+
INSERT INTO TABLEBETA VALUES (2012, '12', 'Monterey');
41+
INSERT INTO TABLEBETA VALUES (2013, '13', 'Ventura');
42+
INSERT INTO TABLEBETA VALUES (2014, '14', 'Sonoma');

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ private TransformSchemaOperator(
9696
@Override
9797
public void open() throws Exception {
9898
super.open();
99+
transforms = new ArrayList<>();
99100
for (Tuple5<String, String, String, String, String> transformRule : transformRules) {
100101
String tableInclusions = transformRule.f0;
101102
String projection = transformRule.f1;
@@ -104,7 +105,6 @@ public void open() throws Exception {
104105
String tableOptions = transformRule.f4;
105106
Selectors selectors =
106107
new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
107-
transforms = new ArrayList<>();
108108
transforms.add(new Tuple2<>(selectors, TransformProjection.of(projection)));
109109
schemaMetadataTransformers.add(
110110
new Tuple2<>(

0 commit comments

Comments
 (0)