21
21
import org .apache .flink .api .common .typeutils .TypeSerializer ;
22
22
import org .apache .flink .cdc .common .utils .TestCaseUtils ;
23
23
import org .apache .flink .cdc .connectors .mysql .debezium .DebeziumUtils ;
24
- import org .apache .flink .cdc .connectors .mysql .testutils .MySqlContainer ;
25
24
import org .apache .flink .cdc .connectors .mysql .testutils .TestTable ;
26
25
import org .apache .flink .cdc .connectors .mysql .testutils .TestTableSchemas ;
27
26
import org .apache .flink .cdc .connectors .mysql .testutils .UniqueDatabase ;
38
37
import org .apache .flink .streaming .api .operators .collect .CollectSinkOperator ;
39
38
import org .apache .flink .streaming .api .operators .collect .CollectSinkOperatorFactory ;
40
39
import org .apache .flink .streaming .api .operators .collect .CollectStreamSink ;
41
- import org .apache .flink .test .junit5 .MiniClusterExtension ;
42
- import org .apache .flink .util .FlinkRuntimeException ;
43
40
44
41
import io .debezium .connector .mysql .MySqlConnection ;
45
- import org .junit .jupiter .api .AfterAll ;
46
- import org .junit .jupiter .api .AfterEach ;
47
- import org .junit .jupiter .api .BeforeAll ;
48
- import org .junit .jupiter .api .BeforeEach ;
49
- import org .junit .jupiter .api .Test ;
50
- import org .junit .jupiter .api .extension .RegisterExtension ;
51
- import org .junit .rules .TemporaryFolder ;
42
+ import org .junit .After ;
43
+ import org .junit .Before ;
44
+ import org .junit .Test ;
52
45
import org .slf4j .Logger ;
53
46
import org .slf4j .LoggerFactory ;
54
- import org .testcontainers .containers .output .Slf4jLogConsumer ;
55
47
56
- import java .io .File ;
57
48
import java .lang .reflect .Field ;
58
- import java .nio .charset .StandardCharsets ;
59
49
import java .nio .file .Files ;
60
50
import java .nio .file .Path ;
61
- import java .nio .file .Paths ;
62
- import java .nio .file .StandardOpenOption ;
63
- import java .time .ZoneId ;
64
51
import java .util .ArrayList ;
65
52
import java .util .Arrays ;
66
- import java .util .Collections ;
67
53
import java .util .HashMap ;
68
54
import java .util .Iterator ;
69
55
import java .util .List ;
70
56
import java .util .Map ;
71
- import java .util .Objects ;
72
57
import java .util .Properties ;
58
+ import java .util .Random ;
73
59
import java .util .UUID ;
74
60
75
61
/**
76
62
* Integration tests for handling schema changes regard to renaming multiple tables within a single
77
63
* statement.
78
64
*/
79
- public class MySqlMultipleTablesRenamingITCase {
65
+ public class MySqlMultipleTablesRenamingITCase extends MySqlSourceTestBase {
80
66
private static final Logger LOG =
81
67
LoggerFactory .getLogger (MySqlMultipleTablesRenamingITCase .class );
82
- @ RegisterExtension static MiniClusterExtension miniCluster = new MiniClusterExtension ();
83
-
84
- @ SuppressWarnings ("unchecked" )
85
- private static final MySqlContainer MYSQL_CONTAINER =
86
- (MySqlContainer )
87
- new MySqlContainer ()
88
- .withConfigurationOverride (
89
- buildMySqlConfigWithTimezone (
90
- getResourceFolder (), getSystemTimeZone ()))
91
- .withSetupSQL ("docker/setup.sql" )
92
- .withDatabaseName ("flink-test" )
93
- .withUsername ("flinkuser" )
94
- .withPassword ("flinkpw" )
95
- .withLogConsumer (new Slf4jLogConsumer (LOG ));
96
68
97
69
private final UniqueDatabase customDatabase =
98
70
new UniqueDatabase (MYSQL_CONTAINER , "customer" , "mysqluser" , "mysqlpw" );
@@ -101,25 +73,15 @@ public class MySqlMultipleTablesRenamingITCase {
101
73
102
74
private MySqlConnection connection ;
103
75
104
- @ BeforeAll
105
- public static void before () throws Exception {
106
- MYSQL_CONTAINER .start ();
107
- }
108
-
109
- @ AfterAll
110
- public static void after () throws Exception {
111
- MYSQL_CONTAINER .stop ();
112
- }
113
-
114
- @ BeforeEach
115
- void prepare () throws Exception {
76
+ @ Before
77
+ public void prepare () throws Exception {
116
78
connection = getConnection ();
117
79
customDatabase .createAndInitialize ();
118
80
flushLogs ();
119
81
}
120
82
121
- @ AfterEach
122
- void tearDown () throws Exception {
83
+ @ After
84
+ public void tearDown () throws Exception {
123
85
customDatabase .dropDatabase ();
124
86
connection .close ();
125
87
}
@@ -146,7 +108,7 @@ void tearDown() throws Exception {
146
108
* during schema updates.
147
109
*/
148
110
@ Test
149
- void testRenameTablesWithinSingleStatement () throws Exception {
111
+ public void testRenameTablesWithinSingleStatement () throws Exception {
150
112
// Build Flink job
151
113
StreamExecutionEnvironment env = getExecutionEnvironment ();
152
114
MySqlSource <String > source = getSourceBuilder ().build ();
@@ -269,6 +231,8 @@ private MySqlSourceBuilder<String> getSourceBuilder() {
269
231
.password (customDatabase .getPassword ())
270
232
.databaseList (customDatabase .getDatabaseName ())
271
233
.tableList (customers .getTableId ())
234
+ .serverId (getServerId ())
235
+ .serverTimeZone ("UTC" )
272
236
.deserializer (new JsonDebeziumDeserializationSchema ());
273
237
}
274
238
@@ -323,50 +287,6 @@ private static List<String> fetchRow(Iterator<String> iter, int size) {
323
287
return rows ;
324
288
}
325
289
326
- private static String buildMySqlConfigWithTimezone (File resourceDirectory , String timezone ) {
327
- try {
328
- TemporaryFolder tempFolder = new TemporaryFolder (resourceDirectory );
329
- tempFolder .create ();
330
- File folder = tempFolder .newFolder (String .valueOf (UUID .randomUUID ()));
331
- Path cnf = Files .createFile (Paths .get (folder .getPath (), "my.cnf" ));
332
- String mysqldConf =
333
- "[mysqld]\n "
334
- + "binlog_format = row\n "
335
- + "log_bin = mysql-bin\n "
336
- + "server-id = 223344\n "
337
- + "binlog_row_image = FULL\n "
338
- + "gtid_mode = on\n "
339
- + "enforce_gtid_consistency = on\n " ;
340
- String timezoneConf = "default-time_zone = '" + timezone + "'\n " ;
341
- Files .write (
342
- cnf ,
343
- Collections .singleton (mysqldConf + timezoneConf ),
344
- StandardCharsets .UTF_8 ,
345
- StandardOpenOption .APPEND );
346
- return Paths .get (resourceDirectory .getAbsolutePath ()).relativize (cnf ).toString ();
347
- } catch (Exception e ) {
348
- throw new RuntimeException ("Failed to create my.cnf file." , e );
349
- }
350
- }
351
-
352
- private static File getResourceFolder () {
353
- try {
354
- return Paths .get (
355
- Objects .requireNonNull (
356
- SpecificStartingOffsetITCase .class
357
- .getClassLoader ()
358
- .getResource ("." ))
359
- .toURI ())
360
- .toFile ();
361
- } catch (Exception e ) {
362
- throw new FlinkRuntimeException ("Get Resource File Directory fail" );
363
- }
364
- }
365
-
366
- private static String getSystemTimeZone () {
367
- return ZoneId .systemDefault ().toString ();
368
- }
369
-
370
290
private void setupSavepoint (StreamExecutionEnvironment env , String savepointPath )
371
291
throws Exception {
372
292
// restore from savepoint
@@ -392,4 +312,10 @@ private StreamExecutionEnvironment getExecutionEnvironment() {
392
312
env .getCheckpointConfig ().setCheckpointingMode (CheckpointingMode .EXACTLY_ONCE );
393
313
return env ;
394
314
}
315
+
316
+ private String getServerId () {
317
+ final Random random = new Random ();
318
+ int serverId = random .nextInt (100 ) + 5400 ;
319
+ return serverId + "-" + (serverId + DEFAULT_PARALLELISM );
320
+ }
395
321
}
0 commit comments