Skip to content

Commit 0aca5d5

Browse files
committed
fix: polardbx test case
1 parent c527f43 commit 0aca5d5

File tree

5 files changed

+76
-75
lines changed

5 files changed

+76
-75
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java

+12-17
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@
9696
import java.util.concurrent.FutureTask;
9797
import java.util.concurrent.TimeUnit;
9898
import java.util.concurrent.TimeoutException;
99-
import java.util.stream.Collectors;
10099

101100
import static java.lang.String.format;
102101
import static org.apache.flink.api.common.JobStatus.RUNNING;
@@ -347,7 +346,10 @@ public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception {
347346
// Check all snapshot records are sent with exactly-once semantics
348347
assertEqualsInAnyOrder(
349348
Arrays.asList(expectedSnapshotData),
350-
fetchAndConvert(iterator, expectedSnapshotData.length, RowData::toString));
349+
fetchAndConvert(
350+
iterator,
351+
expectedSnapshotData.length,
352+
MySqlSourceITCase::convertRowDataToRowString));
351353
assertTrue(!hasNextData(iterator));
352354
jobClient.cancel().get();
353355
}
@@ -1073,26 +1075,19 @@ private void checkBinlogData(
10731075
assertThat(iterator.hasNext()).isFalse();
10741076
}
10751077

1076-
private static List<String> convertRowDataToRowString(List<RowData> rows) {
1078+
private static String convertRowDataToRowString(RowData row) {
10771079
LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
10781080
map.put("id", 0);
10791081
map.put("name", 1);
10801082
map.put("address", 2);
10811083
map.put("phone_number", 3);
1082-
return rows.stream()
1083-
.map(
1084-
row ->
1085-
RowUtils.createRowWithNamedPositions(
1086-
row.getRowKind(),
1087-
new Object[] {
1088-
row.getLong(0),
1089-
row.getString(1),
1090-
row.getString(2),
1091-
row.getString(3)
1092-
},
1093-
map)
1094-
.toString())
1095-
.collect(Collectors.toList());
1084+
return RowUtils.createRowWithNamedPositions(
1085+
row.getRowKind(),
1086+
new Object[] {
1087+
row.getLong(0), row.getString(1), row.getString(2), row.getString(3)
1088+
},
1089+
map)
1090+
.toString();
10961091
}
10971092

10981093
private String getTableNameRegex(String[] captureCustomerTables) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java

+21-18
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,10 @@ public void testRemoveAndAddNewTable() throws Exception {
489489
.collect(Collectors.toList())
490490
: expectedCustomersResult;
491491
List<String> rows =
492-
fetchAndConvert(iterator, expectedSnapshotResult.size(), RowData::toString);
492+
fetchAndConvert(
493+
iterator,
494+
expectedSnapshotResult.size(),
495+
NewlyAddedTableITCase::convertRowDataToRowString);
493496
assertEqualsInAnyOrder(expectedSnapshotResult, rows);
494497

495498
// make binlog events
@@ -505,7 +508,11 @@ public void testRemoveAndAddNewTable() throws Exception {
505508
"UPDATE " + tableId + " SET address = 'Update2' where id = 103");
506509
connection.commit();
507510
}
508-
rows = fetchAndConvert(iterator, expectedBinlogResult.size(), RowData::toString);
511+
rows =
512+
fetchAndConvert(
513+
iterator,
514+
expectedBinlogResult.size(),
515+
NewlyAddedTableITCase::convertRowDataToRowString);
509516
assertEqualsInAnyOrder(expectedBinlogResult, rows);
510517

511518
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
@@ -542,28 +549,24 @@ protected CollectResultIterator<RowData> addCollectSink(DataStream<RowData> stre
542549
return iterator;
543550
}
544551

545-
private static List<String> convertRowDataToRowString(List<RowData> rows) {
552+
private static String convertRowDataToRowString(RowData row) {
546553
LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
547554
map.put("id", 0);
548555
map.put("name", 1);
549556
map.put("address", 2);
550557
map.put("phone_number", 3);
551558
map.put("_table_name", 4);
552-
return rows.stream()
553-
.map(
554-
row ->
555-
RowUtils.createRowWithNamedPositions(
556-
row.getRowKind(),
557-
new Object[] {
558-
row.getLong(0),
559-
row.getString(1),
560-
row.getString(2),
561-
row.getString(3),
562-
row.getString(4)
563-
},
564-
map)
565-
.toString())
566-
.collect(Collectors.toList());
559+
return RowUtils.createRowWithNamedPositions(
560+
row.getRowKind(),
561+
new Object[] {
562+
row.getLong(0),
563+
row.getString(1),
564+
row.getString(2),
565+
row.getString(3),
566+
row.getString(4)
567+
},
568+
map)
569+
.toString();
567570
}
568571

569572
private void testRemoveTablesOneByOne(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,8 @@ public void testCharset() throws Exception {
170170
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
171171
+ ")",
172172
testName,
173-
HOST_NAME,
174-
PORT,
173+
getHost(),
174+
getPort(),
175175
USER_NAME,
176176
PASSWORD,
177177
DATABASE,
@@ -189,7 +189,7 @@ public void testCharset() throws Exception {
189189
waitForSnapshotStarted(iterator);
190190
assertEqualsInAnyOrder(
191191
Arrays.asList(snapshotExpected),
192-
fetchAndConvert(iterator, snapshotExpected.length, Row::toString));
192+
fetchAndConvert(iterator, snapshotExpected.length, WAITING_TIMEOUT, Row::toString));
193193

194194
// test binlog phase
195195
try (Connection connection = getJdbcConnection();
@@ -201,7 +201,7 @@ public void testCharset() throws Exception {
201201
}
202202
assertEqualsInAnyOrder(
203203
Arrays.asList(binlogExpected),
204-
fetchAndConvert(iterator, binlogExpected.length, Row::toString));
204+
fetchAndConvert(iterator, binlogExpected.length, WAITING_TIMEOUT, Row::toString));
205205
result.getJobClient().get().cancel().get();
206206
}
207207
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java

+16-11
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ public void testSingleKey() throws Exception {
8585
+ " 'server-time-zone' = 'UTC',"
8686
+ " 'server-id' = '%s'"
8787
+ ")",
88-
HOST_NAME,
89-
PORT,
88+
getHost(),
89+
getPort(),
9090
USER_NAME,
9191
PASSWORD,
9292
DATABASE,
@@ -111,7 +111,8 @@ public void testSingleKey() throws Exception {
111111
}
112112

113113
List<String> realSnapshotData =
114-
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString);
114+
fetchAndConvert(
115+
iterator, expectedSnapshotData.size(), WAITING_TIMEOUT, Row::toString);
115116
assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData);
116117

117118
// second step: check the sink data
@@ -158,7 +159,8 @@ public void testSingleKey() throws Exception {
158159
for (int i = 0; i < captureCustomerTables.length; i++) {
159160
expectedBinlogData.addAll(Arrays.asList(expectedBinlog));
160161
}
161-
List<String> realBinlog = fetchAndConvert(iterator, expectedBinlog.length, Row::toString);
162+
List<String> realBinlog =
163+
fetchAndConvert(iterator, expectedBinlog.length, WAITING_TIMEOUT, Row::toString);
162164
assertEqualsInOrder(expectedBinlogData, realBinlog);
163165
tableResult.getJobClient().get().cancel().get();
164166
}
@@ -237,8 +239,8 @@ public void testFullTypesDdl() throws Exception {
237239
+ " 'server-time-zone' = 'UTC',"
238240
+ " 'server-id' = '%s'"
239241
+ ")",
240-
HOST_NAME,
241-
PORT,
242+
getHost(),
243+
getPort(),
242244
USER_NAME,
243245
PASSWORD,
244246
DATABASE,
@@ -248,7 +250,8 @@ public void testFullTypesDdl() throws Exception {
248250

249251
TableResult tableResult = tEnv.executeSql("select * from polardbx_full_types");
250252
CloseableIterator<Row> iterator = tableResult.collect();
251-
List<String> realSnapshotData = fetchAndConvert(iterator, 1, Row::toString);
253+
List<String> realSnapshotData =
254+
fetchAndConvert(iterator, 1, WAITING_TIMEOUT, Row::toString);
252255
String[] expectedSnapshotData =
253256
new String[] {
254257
"+I[100001, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, "
@@ -304,8 +307,8 @@ public void testMultiKeys() throws Exception {
304307
+ " 'server-time-zone' = 'UTC',"
305308
+ " 'server-id' = '%s'"
306309
+ ")",
307-
HOST_NAME,
308-
PORT,
310+
getHost(),
311+
getPort(),
309312
USER_NAME,
310313
PASSWORD,
311314
DATABASE,
@@ -330,7 +333,8 @@ public void testMultiKeys() throws Exception {
330333
}
331334

332335
List<String> realSnapshotData =
333-
fetchAndConvert(iterator, expectedSnapshotData.size(), Row::toString);
336+
fetchAndConvert(
337+
iterator, expectedSnapshotData.size(), WAITING_TIMEOUT, Row::toString);
334338
assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData);
335339

336340
// second step: check the sink data
@@ -379,7 +383,8 @@ public void testMultiKeys() throws Exception {
379383
"+I[7, 9999, 9999, 1007, 2022-01-17T00:00]",
380384
"-D[7, 9999, 9999, 1007, 2022-01-17T00:00]"
381385
};
382-
List<String> realBinlog = fetchAndConvert(iterator, expectedBinlog.length, Row::toString);
386+
List<String> realBinlog =
387+
fetchAndConvert(iterator, expectedBinlog.length, WAITING_TIMEOUT, Row::toString);
383388
assertEqualsInAnyOrder(Arrays.asList(expectedBinlog), realBinlog);
384389
tableResult.getJobClient().get().cancel().get();
385390
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java

+23-25
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020
import org.apache.flink.cdc.common.testutils.TestCaseUtils;
2121
import org.apache.flink.test.util.AbstractTestBase;
2222

23-
import com.github.dockerjava.api.model.ExposedPort;
24-
import com.github.dockerjava.api.model.PortBinding;
25-
import com.github.dockerjava.api.model.Ports;
2623
import org.apache.commons.lang3.StringUtils;
2724
import org.junit.AfterClass;
2825
import org.junit.BeforeClass;
@@ -61,36 +58,37 @@
6158
public abstract class PolardbxSourceTestBase extends AbstractTestBase {
6259
private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceTestBase.class);
6360
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
64-
protected static final Integer PORT = 8527;
65-
protected static final String HOST_NAME = "127.0.0.1";
66-
protected static final String USER_NAME = "polardbx_root";
67-
protected static final String PASSWORD = "123456";
61+
6862
private static final String IMAGE_VERSION = "2.1.0";
6963
private static final DockerImageName POLARDBX_IMAGE =
7064
DockerImageName.parse("polardbx/polardb-x:" + IMAGE_VERSION);
7165

66+
protected static final Integer INNER_PORT = 8527;
67+
protected static final String USER_NAME = "polardbx_root";
68+
protected static final String PASSWORD = "123456";
69+
protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(1);
70+
7271
protected static final GenericContainer POLARDBX_CONTAINER =
7372
new GenericContainer<>(POLARDBX_IMAGE)
74-
.withExposedPorts(PORT)
73+
.withExposedPorts(INNER_PORT)
7574
.withLogConsumer(new Slf4jLogConsumer(LOG))
76-
.withStartupTimeout(Duration.ofMinutes(3))
77-
.withCreateContainerCmdModifier(
78-
c ->
79-
c.withPortBindings(
80-
new PortBinding(
81-
Ports.Binding.bindPort(PORT),
82-
new ExposedPort(PORT))));
75+
.withStartupTimeout(Duration.ofMinutes(3));
76+
77+
protected static String getHost() {
78+
return POLARDBX_CONTAINER.getHost();
79+
}
80+
81+
protected static int getPort() {
82+
return POLARDBX_CONTAINER.getMappedPort(INNER_PORT);
83+
}
8384

8485
@BeforeClass
85-
public static void startContainers() throws InterruptedException {
86-
// no need to start container when the port 8527 is listening
87-
if (!checkConnection()) {
88-
LOG.info("Polardbx connection is not valid, so try to start containers...");
89-
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
90-
LOG.info("Containers are started.");
91-
// here should wait 10s that make sure the polardbx is ready
92-
Thread.sleep(10 * 1000);
93-
}
86+
public static void startContainers() {
87+
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
88+
LOG.info("Containers are started.");
89+
90+
TestCaseUtils.repeatedCheck(
91+
PolardbxSourceTestBase::checkConnection, WAITING_TIMEOUT, Duration.ofSeconds(1));
9492
}
9593

9694
@AfterClass
@@ -101,7 +99,7 @@ public static void stopContainers() {
10199
}
102100

103101
protected static String getJdbcUrl() {
104-
return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT);
102+
return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
105103
}
106104

107105
protected static Connection getJdbcConnection() throws SQLException {

0 commit comments

Comments
 (0)