21
21
import org .apache .flink .api .common .typeutils .TypeSerializer ;
22
22
import org .apache .flink .cdc .common .utils .TestCaseUtils ;
23
23
import org .apache .flink .cdc .connectors .mysql .debezium .DebeziumUtils ;
24
- import org .apache .flink .cdc .connectors .mysql .testutils .MySqlContainer ;
25
24
import org .apache .flink .cdc .connectors .mysql .testutils .TestTable ;
26
25
import org .apache .flink .cdc .connectors .mysql .testutils .TestTableSchemas ;
27
26
import org .apache .flink .cdc .connectors .mysql .testutils .UniqueDatabase ;
38
37
import org .apache .flink .streaming .api .operators .collect .CollectSinkOperator ;
39
38
import org .apache .flink .streaming .api .operators .collect .CollectSinkOperatorFactory ;
40
39
import org .apache .flink .streaming .api .operators .collect .CollectStreamSink ;
41
- import org .apache .flink .test .junit5 .MiniClusterExtension ;
42
40
import org .apache .flink .util .FlinkRuntimeException ;
43
41
44
42
import io .debezium .connector .mysql .MySqlConnection ;
45
- import org .junit .jupiter .api .AfterAll ;
46
- import org .junit .jupiter .api .AfterEach ;
47
- import org .junit .jupiter .api .BeforeAll ;
48
- import org .junit .jupiter .api .BeforeEach ;
49
- import org .junit .jupiter .api .Test ;
50
- import org .junit .jupiter .api .extension .RegisterExtension ;
43
+ import org .junit .After ;
44
+ import org .junit .Before ;
45
+ import org .junit .Test ;
51
46
import org .junit .rules .TemporaryFolder ;
52
47
import org .slf4j .Logger ;
53
48
import org .slf4j .LoggerFactory ;
54
- import org .testcontainers .containers .output .Slf4jLogConsumer ;
55
49
56
50
import java .io .File ;
57
51
import java .lang .reflect .Field ;
70
64
import java .util .Map ;
71
65
import java .util .Objects ;
72
66
import java .util .Properties ;
67
+ import java .util .Random ;
73
68
import java .util .UUID ;
74
69
70
+ import static org .apache .flink .cdc .connectors .mysql .source .MySqlSourceTestBase .DEFAULT_PARALLELISM ;
71
+
75
72
/**
76
73
* Integration tests for handling schema changes regard to renaming multiple tables within a single
77
74
* statement.
78
75
*/
79
- public class MySqlMultipleTablesRenamingITCase {
76
+ public class MySqlMultipleTablesRenamingITCase extends MySqlSourceTestBase {
80
77
private static final Logger LOG =
81
78
LoggerFactory .getLogger (MySqlMultipleTablesRenamingITCase .class );
82
- @ RegisterExtension static MiniClusterExtension miniCluster = new MiniClusterExtension ();
83
-
84
- @ SuppressWarnings ("unchecked" )
85
- private static final MySqlContainer MYSQL_CONTAINER =
86
- (MySqlContainer )
87
- new MySqlContainer ()
88
- .withConfigurationOverride (
89
- buildMySqlConfigWithTimezone (
90
- getResourceFolder (), getSystemTimeZone ()))
91
- .withSetupSQL ("docker/setup.sql" )
92
- .withDatabaseName ("flink-test" )
93
- .withUsername ("flinkuser" )
94
- .withPassword ("flinkpw" )
95
- .withLogConsumer (new Slf4jLogConsumer (LOG ));
96
79
97
80
private final UniqueDatabase customDatabase =
98
81
new UniqueDatabase (MYSQL_CONTAINER , "customer" , "mysqluser" , "mysqlpw" );
@@ -101,25 +84,15 @@ public class MySqlMultipleTablesRenamingITCase {
101
84
102
85
private MySqlConnection connection ;
103
86
104
- @ BeforeAll
105
- public static void before () throws Exception {
106
- MYSQL_CONTAINER .start ();
107
- }
108
-
109
- @ AfterAll
110
- public static void after () throws Exception {
111
- MYSQL_CONTAINER .stop ();
112
- }
113
-
114
- @ BeforeEach
115
- void prepare () throws Exception {
87
+ @ Before
88
+ public void prepare () throws Exception {
116
89
connection = getConnection ();
117
90
customDatabase .createAndInitialize ();
118
91
flushLogs ();
119
92
}
120
93
121
- @ AfterEach
122
- void tearDown () throws Exception {
94
+ @ After
95
+ public void tearDown () throws Exception {
123
96
customDatabase .dropDatabase ();
124
97
connection .close ();
125
98
}
@@ -146,7 +119,7 @@ void tearDown() throws Exception {
146
119
* during schema updates.
147
120
*/
148
121
@ Test
149
- void testRenameTablesWithinSingleStatement () throws Exception {
122
+ public void testRenameTablesWithinSingleStatement () throws Exception {
150
123
// Build Flink job
151
124
StreamExecutionEnvironment env = getExecutionEnvironment ();
152
125
MySqlSource <String > source = getSourceBuilder ().build ();
@@ -269,6 +242,8 @@ private MySqlSourceBuilder<String> getSourceBuilder() {
269
242
.password (customDatabase .getPassword ())
270
243
.databaseList (customDatabase .getDatabaseName ())
271
244
.tableList (customers .getTableId ())
245
+ .serverId (getServerId ())
246
+ .serverTimeZone ("UTC" )
272
247
.deserializer (new JsonDebeziumDeserializationSchema ());
273
248
}
274
249
@@ -392,4 +367,10 @@ private StreamExecutionEnvironment getExecutionEnvironment() {
392
367
env .getCheckpointConfig ().setCheckpointingMode (CheckpointingMode .EXACTLY_ONCE );
393
368
return env ;
394
369
}
370
+
371
+ private String getServerId () {
372
+ final Random random = new Random ();
373
+ int serverId = random .nextInt (100 ) + 5400 ;
374
+ return serverId + "-" + (serverId + DEFAULT_PARALLELISM );
375
+ }
395
376
}
0 commit comments