Skip to content

Commit ec22580

Browse files
committed
fix: polardbx test case
1 parent cc00c92 commit ec22580

File tree

4 files changed

+36
-42
lines changed

4 files changed

+36
-42
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/LegacyMySqlSourceITCase.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@
4949
import java.util.Map;
5050
import java.util.Objects;
5151

52-
import static org.junit.Assert.assertTrue;
53-
5452
/** Integration tests for the legacy {@link MySqlSource}. */
5553
public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase {
5654

@@ -113,9 +111,8 @@ private void testConsumingAllEventsWithJsonFormat(
113111
// check the snapshot result
114112
CloseableIterator<Row> snapshot = result.collect();
115113
waitForSnapshotStarted(snapshot);
116-
assertTrue(
117-
dataInJsonIsEquals(
118-
fetchRows(snapshot, 1).get(0).toString(), expectSnapshot.toString()));
114+
115+
assertJsonEquals(extractJsonBody(snapshot.next()), expectSnapshot);
119116
try (Connection connection = fullTypesDatabase.getJdbcConnection();
120117
Statement statement = connection.createStatement()) {
121118
statement.execute(
@@ -124,10 +121,8 @@ private void testConsumingAllEventsWithJsonFormat(
124121

125122
// check the binlog result
126123
CloseableIterator<Row> binlog = result.collect();
127-
JSONObject expectBinlog = expected.getJSONObject("expected_binlog");
128-
assertTrue(
129-
dataInJsonIsEquals(
130-
fetchRows(binlog, 1).get(0).toString(), expectBinlog.toString()));
124+
JsonNode expectBinlog = expected.get("expected_binlog");
125+
assertJsonEquals(extractJsonBody(binlog.next()), expectBinlog);
131126
result.getJobClient().get().cancel().get();
132127
}
133128

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxCharsetITCase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ public void testCharset() throws Exception {
167167
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
168168
+ ")",
169169
testName,
170-
HOST_NAME,
171-
PORT,
170+
getHost(),
171+
getPort(),
172172
USER_NAME,
173173
PASSWORD,
174174
DATABASE,

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceITCase.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public void testSingleKey() throws Exception {
8383
+ " 'server-time-zone' = 'UTC',"
8484
+ " 'server-id' = '%s'"
8585
+ ")",
86-
HOST_NAME,
87-
PORT,
86+
getHost(),
87+
getPort(),
8888
USER_NAME,
8989
PASSWORD,
9090
DATABASE,
@@ -234,8 +234,8 @@ public void testFullTypesDdl() throws Exception {
234234
+ " 'server-time-zone' = 'UTC',"
235235
+ " 'server-id' = '%s'"
236236
+ ")",
237-
HOST_NAME,
238-
PORT,
237+
getHost(),
238+
getPort(),
239239
USER_NAME,
240240
PASSWORD,
241241
DATABASE,
@@ -301,8 +301,8 @@ public void testMultiKeys() throws Exception {
301301
+ " 'server-time-zone' = 'UTC',"
302302
+ " 'server-id' = '%s'"
303303
+ ")",
304-
HOST_NAME,
305-
PORT,
304+
getHost(),
305+
getPort(),
306306
USER_NAME,
307307
PASSWORD,
308308
DATABASE,

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/polardbx/PolardbxSourceTestBase.java

+24-25
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
package org.apache.flink.cdc.connectors.polardbx;
1919

20+
import org.apache.flink.cdc.common.utils.TestCaseUtils;
2021
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
2122
import org.apache.flink.test.util.AbstractTestBase;
2223
import org.apache.flink.types.Row;
2324

24-
import com.github.dockerjava.api.model.ExposedPort;
25-
import com.github.dockerjava.api.model.PortBinding;
26-
import com.github.dockerjava.api.model.Ports;
2725
import org.apache.commons.lang3.StringUtils;
2826
import org.junit.AfterClass;
2927
import org.junit.BeforeClass;
@@ -64,36 +62,37 @@
6462
public abstract class PolardbxSourceTestBase extends AbstractTestBase {
6563
private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceTestBase.class);
6664
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
67-
protected static final Integer PORT = 8527;
68-
protected static final String HOST_NAME = "127.0.0.1";
69-
protected static final String USER_NAME = "polardbx_root";
70-
protected static final String PASSWORD = "123456";
65+
7166
private static final String IMAGE_VERSION = "2.1.0";
7267
private static final DockerImageName POLARDBX_IMAGE =
7368
DockerImageName.parse("polardbx/polardb-x:" + IMAGE_VERSION);
7469

70+
protected static final Integer INNER_PORT = 8527;
71+
protected static final String USER_NAME = "polardbx_root";
72+
protected static final String PASSWORD = "123456";
73+
protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(1);
74+
7575
protected static final GenericContainer POLARDBX_CONTAINER =
7676
new GenericContainer<>(POLARDBX_IMAGE)
77-
.withExposedPorts(PORT)
77+
.withExposedPorts(INNER_PORT)
7878
.withLogConsumer(new Slf4jLogConsumer(LOG))
79-
.withStartupTimeout(Duration.ofMinutes(3))
80-
.withCreateContainerCmdModifier(
81-
c ->
82-
c.withPortBindings(
83-
new PortBinding(
84-
Ports.Binding.bindPort(PORT),
85-
new ExposedPort(PORT))));
79+
.withStartupTimeout(Duration.ofMinutes(3));
80+
81+
protected static String getHost() {
82+
return POLARDBX_CONTAINER.getHost();
83+
}
84+
85+
protected static int getPort() {
86+
return POLARDBX_CONTAINER.getMappedPort(INNER_PORT);
87+
}
8688

8789
@BeforeClass
88-
public static void startContainers() throws InterruptedException {
89-
// no need to start container when the port 8527 is listening
90-
if (!checkConnection()) {
91-
LOG.info("Polardbx connection is not valid, so try to start containers...");
92-
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
93-
LOG.info("Containers are started.");
94-
// here should wait 10s that make sure the polardbx is ready
95-
Thread.sleep(10 * 1000);
96-
}
90+
public static void startContainers() {
91+
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
92+
LOG.info("Containers are started.");
93+
94+
TestCaseUtils.repeatedCheck(
95+
PolardbxSourceTestBase::checkConnection, WAITING_TIMEOUT, Duration.ofSeconds(1));
9796
}
9897

9998
@AfterClass
@@ -104,7 +103,7 @@ public static void stopContainers() {
104103
}
105104

106105
protected static String getJdbcUrl() {
107-
return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT);
106+
return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
108107
}
109108

110109
protected static Connection getJdbcConnection() throws SQLException {

0 commit comments

Comments
 (0)