Skip to content

Commit 6e6a991

Browse files
committed
[FLINK-35852] Fix decimal precision mismatch after transformation
Somehow this has been fixed in FLINK-35272. Just added an E2e case to verify if it works as expected.
1 parent e58cfd0 commit 6e6a991

File tree

2 files changed

+362
-0
lines changed

2 files changed

+362
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.pipeline.tests;
19+
20+
import org.apache.flink.cdc.common.test.utils.TestUtils;
21+
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
22+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
23+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
24+
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
25+
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
26+
27+
import org.junit.After;
28+
import org.junit.Before;
29+
import org.junit.BeforeClass;
30+
import org.junit.ClassRule;
31+
import org.junit.Test;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
import org.testcontainers.containers.Container;
35+
import org.testcontainers.containers.output.Slf4jLogConsumer;
36+
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
37+
import org.testcontainers.lifecycle.Startables;
38+
39+
import java.nio.file.Path;
40+
import java.sql.Connection;
41+
import java.sql.DriverManager;
42+
import java.sql.ResultSet;
43+
import java.sql.SQLException;
44+
import java.sql.Statement;
45+
import java.time.Duration;
46+
import java.time.temporal.ChronoUnit;
47+
import java.util.ArrayList;
48+
import java.util.Arrays;
49+
import java.util.List;
50+
import java.util.stream.Collectors;
51+
import java.util.stream.Stream;
52+
53+
import static org.junit.Assert.assertArrayEquals;
54+
import static org.junit.Assert.assertEquals;
55+
import static org.junit.Assert.assertTrue;
56+
57+
/** End-to-end tests for complex data types. */
58+
public class ComplexDataTypesE2eITCase extends PipelineTestEnvironment {
59+
private static final Logger LOG = LoggerFactory.getLogger(ComplexDataTypesE2eITCase.class);
60+
61+
// ------------------------------------------------------------------------------------------
62+
// MySQL Variables (we always use MySQL as the data source for easier verifying)
63+
// ------------------------------------------------------------------------------------------
64+
protected static final String MYSQL_TEST_USER = "mysqluser";
65+
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
66+
protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
67+
public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
68+
public static final int TESTCASE_TIMEOUT_SECONDS = 60;
69+
70+
@ClassRule
71+
public static final MySqlContainer MYSQL =
72+
(MySqlContainer)
73+
new MySqlContainer(
74+
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
75+
.withConfigurationOverride("docker/mysql/my.cnf")
76+
.withSetupSQL("docker/mysql/setup.sql")
77+
.withDatabaseName("flink-test")
78+
.withUsername("flinkuser")
79+
.withPassword("flinkpw")
80+
.withNetwork(NETWORK)
81+
.withNetworkAliases("mysql")
82+
.withLogConsumer(new Slf4jLogConsumer(LOG));
83+
84+
@ClassRule
85+
public static final DorisContainer DORIS =
86+
new DorisContainer(NETWORK)
87+
.withNetworkAliases("doris")
88+
.withLogConsumer(new Slf4jLogConsumer(LOG));
89+
90+
protected final UniqueDatabase complexDataTypesDatabase =
91+
new UniqueDatabase(MYSQL, "data_types_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
92+
93+
@BeforeClass
94+
public static void initializeContainers() {
95+
LOG.info("Starting containers...");
96+
Startables.deepStart(Stream.of(MYSQL)).join();
97+
Startables.deepStart(Stream.of(DORIS)).join();
98+
LOG.info("Waiting for backends to be available");
99+
long startWaitingTimestamp = System.currentTimeMillis();
100+
101+
new LogMessageWaitStrategy()
102+
.withRegEx(".*get heartbeat from FE.*")
103+
.withTimes(1)
104+
.withStartupTimeout(
105+
Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
106+
.waitUntilReady(DORIS);
107+
108+
while (!checkBackendAvailability()) {
109+
try {
110+
if (System.currentTimeMillis() - startWaitingTimestamp
111+
> DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) {
112+
throw new RuntimeException("Doris backend startup timed out.");
113+
}
114+
LOG.info("Waiting for backends to be available");
115+
Thread.sleep(1000);
116+
} catch (InterruptedException ignored) {
117+
// ignore and check next round
118+
}
119+
}
120+
LOG.info("Containers are started.");
121+
}
122+
123+
@Before
124+
public void before() throws Exception {
125+
super.before();
126+
complexDataTypesDatabase.createAndInitialize();
127+
createDorisDatabase(complexDataTypesDatabase.getDatabaseName());
128+
}
129+
130+
private static boolean checkBackendAvailability() {
131+
try {
132+
Container.ExecResult rs =
133+
DORIS.execInContainer(
134+
"mysql",
135+
"--protocol=TCP",
136+
"-uroot",
137+
"-P9030",
138+
"-h127.0.0.1",
139+
"-e SHOW BACKENDS\\G");
140+
141+
if (rs.getExitCode() != 0) {
142+
return false;
143+
}
144+
String output = rs.getStdout();
145+
LOG.info("Doris backend status:\n{}", output);
146+
return output.contains("*************************** 1. row ***************************")
147+
&& !output.contains("AvailCapacity: 1.000 B");
148+
} catch (Exception e) {
149+
LOG.info("Failed to check backend status.", e);
150+
return false;
151+
}
152+
}
153+
154+
@After
155+
public void after() {
156+
super.after();
157+
complexDataTypesDatabase.dropDatabase();
158+
dropDorisDatabase(complexDataTypesDatabase.getDatabaseName());
159+
}
160+
161+
@Test
162+
public void testSyncWholeDatabase() throws Exception {
163+
String pipelineJob =
164+
String.format(
165+
"source:\n"
166+
+ " type: mysql\n"
167+
+ " hostname: mysql\n"
168+
+ " port: 3306\n"
169+
+ " username: %s\n"
170+
+ " password: %s\n"
171+
+ " tables: %s.\\.*\n"
172+
+ " server-id: 5400-5404\n"
173+
+ " server-time-zone: UTC\n"
174+
+ "\n"
175+
+ "sink:\n"
176+
+ " type: doris\n"
177+
+ " fenodes: doris:8030\n"
178+
+ " benodes: doris:8040\n"
179+
+ " username: %s\n"
180+
+ " password: \"%s\"\n"
181+
+ " table.create.properties.replication_num: 1\n"
182+
+ "\n"
183+
+ "transform:\n"
184+
+ " - source-table: %s.DATA_TYPES_TABLE\n"
185+
+ " projection: \\*, 'fine' AS FINE\n"
186+
+ "pipeline:\n"
187+
+ " parallelism: 1",
188+
MYSQL_TEST_USER,
189+
MYSQL_TEST_PASSWORD,
190+
complexDataTypesDatabase.getDatabaseName(),
191+
DORIS.getUsername(),
192+
DORIS.getPassword(),
193+
complexDataTypesDatabase.getDatabaseName());
194+
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
195+
Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar");
196+
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
197+
submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar);
198+
waitUntilJobRunning(Duration.ofSeconds(30));
199+
LOG.info("Pipeline job is running");
200+
validateSinkResult(
201+
complexDataTypesDatabase.getDatabaseName(),
202+
"DATA_TYPES_TABLE",
203+
4,
204+
Arrays.asList(
205+
"1001 | 2012-12-21 17:00:02 | 100.00 | fine",
206+
"1002 | 2012-12-21 17:00:03 | 100.10 | fine",
207+
"1003 | 2012-12-21 17:00:05 | 100.86 | fine"));
208+
209+
LOG.info("Begin incremental reading stage.");
210+
// generate binlogs
211+
String mysqlJdbcUrl =
212+
String.format(
213+
"jdbc:mysql://%s:%s/%s",
214+
MYSQL.getHost(),
215+
MYSQL.getDatabasePort(),
216+
complexDataTypesDatabase.getDatabaseName());
217+
try (Connection conn =
218+
DriverManager.getConnection(
219+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
220+
Statement stat = conn.createStatement()) {
221+
222+
stat.execute(
223+
"INSERT INTO DATA_TYPES_TABLE VALUES (1004, '2012-12-21 17:00:07', 110.37);");
224+
225+
validateSinkResult(
226+
complexDataTypesDatabase.getDatabaseName(),
227+
"DATA_TYPES_TABLE",
228+
4,
229+
Arrays.asList(
230+
"1001 | 2012-12-21 17:00:02 | 100.00 | fine",
231+
"1002 | 2012-12-21 17:00:03 | 100.10 | fine",
232+
"1003 | 2012-12-21 17:00:05 | 100.86 | fine",
233+
"1004 | 2012-12-21 17:00:07 | 110.37 | fine"));
234+
} catch (SQLException e) {
235+
LOG.error("Update table for CDC failed.", e);
236+
throw e;
237+
}
238+
}
239+
240+
public static void createDorisDatabase(String databaseName) {
241+
try {
242+
Container.ExecResult rs =
243+
DORIS.execInContainer(
244+
"mysql",
245+
"--protocol=TCP",
246+
"-uroot",
247+
"-P9030",
248+
"-h127.0.0.1",
249+
String.format("-e CREATE DATABASE IF NOT EXISTS `%s`;", databaseName));
250+
251+
if (rs.getExitCode() != 0) {
252+
throw new RuntimeException("Failed to create database." + rs.getStderr());
253+
}
254+
} catch (Exception e) {
255+
throw new RuntimeException("Failed to create database.", e);
256+
}
257+
}
258+
259+
public static void dropDorisDatabase(String databaseName) {
260+
try {
261+
Container.ExecResult rs =
262+
DORIS.execInContainer(
263+
"mysql",
264+
"--protocol=TCP",
265+
"-uroot",
266+
"-P9030",
267+
"-h127.0.0.1",
268+
String.format("-e DROP DATABASE IF EXISTS %s;", databaseName));
269+
270+
if (rs.getExitCode() != 0) {
271+
throw new RuntimeException("Failed to drop database." + rs.getStderr());
272+
}
273+
} catch (Exception e) {
274+
throw new RuntimeException("Failed to drop database.", e);
275+
}
276+
}
277+
278+
private void validateSinkResult(
279+
String databaseName, String tableName, int columnCount, List<String> expected)
280+
throws Exception {
281+
long startWaitingTimestamp = System.currentTimeMillis();
282+
while (true) {
283+
if (System.currentTimeMillis() - startWaitingTimestamp
284+
> TESTCASE_TIMEOUT_SECONDS * 1000) {
285+
throw new RuntimeException("Doris backend startup timed out.");
286+
}
287+
List<String> results = new ArrayList<>();
288+
try (Connection conn =
289+
DriverManager.getConnection(
290+
DORIS.getJdbcUrl(databaseName, DORIS.getUsername()));
291+
Statement stat = conn.createStatement()) {
292+
ResultSet rs =
293+
stat.executeQuery(
294+
String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName));
295+
296+
while (rs.next()) {
297+
List<String> columns = new ArrayList<>();
298+
for (int i = 1; i <= columnCount; i++) {
299+
try {
300+
columns.add(rs.getString(i));
301+
} catch (SQLException ignored) {
302+
// Column count could change after schema evolution
303+
columns.add(null);
304+
}
305+
}
306+
results.add(String.join(" | ", columns));
307+
}
308+
309+
if (expected.size() == results.size()) {
310+
assertEqualsInAnyOrder(expected, results);
311+
break;
312+
} else {
313+
Thread.sleep(1000);
314+
}
315+
} catch (SQLException e) {
316+
LOG.info("Validate sink result failure, waiting for next turn...", e);
317+
Thread.sleep(1000);
318+
}
319+
}
320+
}
321+
322+
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
323+
assertTrue(expected != null && actual != null);
324+
assertEqualsInOrder(
325+
expected.stream().sorted().collect(Collectors.toList()),
326+
actual.stream().sorted().collect(Collectors.toList()));
327+
}
328+
329+
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
330+
assertTrue(expected != null && actual != null);
331+
assertEquals(expected.size(), actual.size());
332+
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
333+
}
334+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one or more
2+
-- contributor license agreements. See the NOTICE file distributed with
3+
-- this work for additional information regarding copyright ownership.
4+
-- The ASF licenses this file to You under the Apache License, Version 2.0
5+
-- (the "License"); you may not use this file except in compliance with
6+
-- the License. You may obtain a copy of the License at
7+
--
8+
-- http://www.apache.org/licenses/LICENSE-2.0
9+
--
10+
-- Unless required by applicable law or agreed to in writing, software
11+
-- distributed under the License is distributed on an "AS IS" BASIS,
12+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
-- See the License for the specific language governing permissions and
14+
-- limitations under the License.
15+
16+
DROP TABLE IF EXISTS DATA_TYPES_TABLE;
17+
18+
CREATE TABLE DATA_TYPES_TABLE (
19+
ID INT NOT NULL,
20+
TS DATETIME(0),
21+
NUM DECIMAL(10, 2),
22+
PRIMARY KEY (ID)
23+
);
24+
25+
INSERT INTO DATA_TYPES_TABLE VALUES (1001, '2012-12-21 17:00:02', 100.00);
26+
INSERT INTO DATA_TYPES_TABLE VALUES (1002, '2012-12-21 17:00:03', 100.10);
27+
INSERT INTO DATA_TYPES_TABLE VALUES (1003, '2012-12-21 17:00:05', 100.86);
28+

0 commit comments

Comments
 (0)