From 1b9c77b705cd8fb53e98932bd633e1cff0ebce5d Mon Sep 17 00:00:00 2001
From: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
Date: Thu, 6 Feb 2025 10:16:06 +0800
Subject: [PATCH] mig-test: fix datastream migration failure
---
tools/mig-test/datastream/compile_jobs.rb | 2 +-
.../datastream/datastream-3.2.0/pom.xml | 92 +++++++++++++++++--
.../src/main/java/DataStreamJob.java | 8 +-
.../datastream/datastream-3.2.1/pom.xml | 90 ++++++++++++++++--
.../src/main/java/DataStreamJob.java | 8 +-
.../datastream/datastream-3.3.0/pom.xml | 92 +++++++++++++++++--
.../src/main/java/DataStreamJob.java | 8 +-
.../datastream-3.4-SNAPSHOT/pom.xml | 92 +++++++++++++++++--
.../src/main/java/DataStreamJob.java | 8 +-
.../mig-test/datastream/run_migration_test.rb | 16 +++-
tools/mig-test/run_migration_test.rb | 2 +-
11 files changed, 356 insertions(+), 62 deletions(-)
diff --git a/tools/mig-test/datastream/compile_jobs.rb b/tools/mig-test/datastream/compile_jobs.rb
index 5c906e5bf31..2326a454059 100644
--- a/tools/mig-test/datastream/compile_jobs.rb
+++ b/tools/mig-test/datastream/compile_jobs.rb
@@ -20,7 +20,7 @@
JOB_VERSIONS.each do |version|
puts "Compiling DataStream job for CDC #{version}"
- `cd datastream-#{version} && mvn clean package -DskipTests`
+ system "cd datastream-#{version} && mvn clean package -DskipTests"
end
puts 'Done'
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.2.0/pom.xml b/tools/mig-test/datastream/datastream-3.2.0/pom.xml
index c1f556033d1..9eb0212f977 100644
--- a/tools/mig-test/datastream/datastream-3.2.0/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.2.0/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
UTF-8
1.18.1
3.2.0
- 1.9.7.Final
+ 1.9.8.Final
2.12
2.0.13
UTF-8
@@ -136,20 +136,96 @@ limitations under the License.
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
+ shade-flink
package
- single
+ shade
+
+
+ false
+ false
+ true
+
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+
+ *:*
+
+ module-info.class
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ io.debezium:debezium-api
+ io.debezium:debezium-embedded
+ io.debezium:debezium-core
+ io.debezium:debezium-ddl-parser
+ io.debezium:debezium-connector-mysql
+ org.apache.flink:flink-connector-debezium
+ org.apache.flink:flink-connector-mysql-cdc
+ org.antlr:antlr4-runtime
+ org.apache.kafka:*
+ mysql:mysql-connector-java
+ com.zendesk:mysql-binlog-connector-java
+ com.fasterxml.*:*
+ com.google.guava:*
+ com.esri.geometry:esri-geometry-api
+ com.zaxxer:HikariCP
+
+ org.apache.flink:flink-shaded-guava
+
+
+
+
+ org.apache.kafka
+
+ org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+
+
+
+ org.antlr
+
+ org.apache.flink.cdc.connectors.shaded.org.antlr
+
+
+
+ com.fasterxml
+
+ org.apache.flink.cdc.connectors.shaded.com.fasterxml
+
+
+
+ com.google
+
+ org.apache.flink.cdc.connectors.shaded.com.google
+
+
+
+ com.esri.geometry
+ org.apache.flink.cdc.connectors.shaded.com.esri.geometry
+
+
+ com.zaxxer
+
+ org.apache.flink.cdc.connectors.shaded.com.zaxxer
+
+
+
+
-
-
- jar-with-dependencies
-
-
diff --git a/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java
index f821ac0a2de..bfaa2d529bf 100644
--- a/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java
+++ b/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@
public class DataStreamJob {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("localhost")
.port(3306)
@@ -45,10 +45,6 @@ public static void main(String[] args) {
.print()
.setParallelism(1);
- try {
- env.execute();
- } catch (Exception e) {
- // ... unfortunately
- }
+ env.execute();
}
}
diff --git a/tools/mig-test/datastream/datastream-3.2.1/pom.xml b/tools/mig-test/datastream/datastream-3.2.1/pom.xml
index c7d680a2f3b..b3e2117720b 100644
--- a/tools/mig-test/datastream/datastream-3.2.1/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.2.1/pom.xml
@@ -136,20 +136,96 @@ limitations under the License.
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
+ shade-flink
package
- single
+ shade
+
+
+ false
+ false
+ true
+
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+
+ *:*
+
+ module-info.class
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ io.debezium:debezium-api
+ io.debezium:debezium-embedded
+ io.debezium:debezium-core
+ io.debezium:debezium-ddl-parser
+ io.debezium:debezium-connector-mysql
+ org.apache.flink:flink-connector-debezium
+ org.apache.flink:flink-connector-mysql-cdc
+ org.antlr:antlr4-runtime
+ org.apache.kafka:*
+ mysql:mysql-connector-java
+ com.zendesk:mysql-binlog-connector-java
+ com.fasterxml.*:*
+ com.google.guava:*
+ com.esri.geometry:esri-geometry-api
+ com.zaxxer:HikariCP
+
+ org.apache.flink:flink-shaded-guava
+
+
+
+
+ org.apache.kafka
+
+ org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+
+
+
+ org.antlr
+
+ org.apache.flink.cdc.connectors.shaded.org.antlr
+
+
+
+ com.fasterxml
+
+ org.apache.flink.cdc.connectors.shaded.com.fasterxml
+
+
+
+ com.google
+
+ org.apache.flink.cdc.connectors.shaded.com.google
+
+
+
+ com.esri.geometry
+ org.apache.flink.cdc.connectors.shaded.com.esri.geometry
+
+
+ com.zaxxer
+
+ org.apache.flink.cdc.connectors.shaded.com.zaxxer
+
+
+
+
-
-
- jar-with-dependencies
-
-
diff --git a/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java
index f821ac0a2de..bfaa2d529bf 100644
--- a/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java
+++ b/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@
public class DataStreamJob {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("localhost")
.port(3306)
@@ -45,10 +45,6 @@ public static void main(String[] args) {
.print()
.setParallelism(1);
- try {
- env.execute();
- } catch (Exception e) {
- // ... unfortunately
- }
+ env.execute();
}
}
diff --git a/tools/mig-test/datastream/datastream-3.3.0/pom.xml b/tools/mig-test/datastream/datastream-3.3.0/pom.xml
index cc65c6c7869..e3ba1b846d9 100644
--- a/tools/mig-test/datastream/datastream-3.3.0/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.3.0/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
UTF-8
1.19.1
3.3.0
- 1.9.7.Final
+ 1.9.8.Final
2.12
2.0.13
UTF-8
@@ -136,20 +136,96 @@ limitations under the License.
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
+ shade-flink
package
- single
+ shade
+
+
+ false
+ false
+ true
+
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+
+ *:*
+
+ module-info.class
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ io.debezium:debezium-api
+ io.debezium:debezium-embedded
+ io.debezium:debezium-core
+ io.debezium:debezium-ddl-parser
+ io.debezium:debezium-connector-mysql
+ org.apache.flink:flink-connector-debezium
+ org.apache.flink:flink-connector-mysql-cdc
+ org.antlr:antlr4-runtime
+ org.apache.kafka:*
+ mysql:mysql-connector-java
+ com.zendesk:mysql-binlog-connector-java
+ com.fasterxml.*:*
+ com.google.guava:*
+ com.esri.geometry:esri-geometry-api
+ com.zaxxer:HikariCP
+
+ org.apache.flink:flink-shaded-guava
+
+
+
+
+ org.apache.kafka
+
+ org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+
+
+
+ org.antlr
+
+ org.apache.flink.cdc.connectors.shaded.org.antlr
+
+
+
+ com.fasterxml
+
+ org.apache.flink.cdc.connectors.shaded.com.fasterxml
+
+
+
+ com.google
+
+ org.apache.flink.cdc.connectors.shaded.com.google
+
+
+
+ com.esri.geometry
+ org.apache.flink.cdc.connectors.shaded.com.esri.geometry
+
+
+ com.zaxxer
+
+ org.apache.flink.cdc.connectors.shaded.com.zaxxer
+
+
+
+
-
-
- jar-with-dependencies
-
-
diff --git a/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java
index f821ac0a2de..bfaa2d529bf 100644
--- a/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java
+++ b/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@
public class DataStreamJob {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("localhost")
.port(3306)
@@ -45,10 +45,6 @@ public static void main(String[] args) {
.print()
.setParallelism(1);
- try {
- env.execute();
- } catch (Exception e) {
- // ... unfortunately
- }
+ env.execute();
}
}
diff --git a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml
index e174d5583cf..f83f6804295 100644
--- a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml
+++ b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
UTF-8
1.19.1
3.4-SNAPSHOT
- 1.9.7.Final
+ 1.9.8.Final
2.12
2.0.13
UTF-8
@@ -136,20 +136,96 @@ limitations under the License.
- maven-assembly-plugin
+ org.apache.maven.plugins
+ maven-shade-plugin
+ shade-flink
package
- single
+ shade
+
+
+ false
+ false
+ true
+
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+
+ *:*
+
+ module-info.class
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ io.debezium:debezium-api
+ io.debezium:debezium-embedded
+ io.debezium:debezium-core
+ io.debezium:debezium-ddl-parser
+ io.debezium:debezium-connector-mysql
+ org.apache.flink:flink-connector-debezium
+ org.apache.flink:flink-connector-mysql-cdc
+ org.antlr:antlr4-runtime
+ org.apache.kafka:*
+ mysql:mysql-connector-java
+ com.zendesk:mysql-binlog-connector-java
+ com.fasterxml.*:*
+ com.google.guava:*
+ com.esri.geometry:esri-geometry-api
+ com.zaxxer:HikariCP
+
+ org.apache.flink:flink-shaded-guava
+
+
+
+
+ org.apache.kafka
+
+ org.apache.flink.cdc.connectors.shaded.org.apache.kafka
+
+
+
+ org.antlr
+
+ org.apache.flink.cdc.connectors.shaded.org.antlr
+
+
+
+ com.fasterxml
+
+ org.apache.flink.cdc.connectors.shaded.com.fasterxml
+
+
+
+ com.google
+
+ org.apache.flink.cdc.connectors.shaded.com.google
+
+
+
+ com.esri.geometry
+ org.apache.flink.cdc.connectors.shaded.com.esri.geometry
+
+
+ com.zaxxer
+
+ org.apache.flink.cdc.connectors.shaded.com.zaxxer
+
+
+
+
-
-
- jar-with-dependencies
-
-
diff --git a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
index f821ac0a2de..bfaa2d529bf 100644
--- a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
+++ b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java
@@ -23,7 +23,7 @@
public class DataStreamJob {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("localhost")
.port(3306)
@@ -45,10 +45,6 @@ public static void main(String[] args) {
.print()
.setParallelism(1);
- try {
- env.execute();
- } catch (Exception e) {
- // ... unfortunately
- }
+ env.execute();
}
}
diff --git a/tools/mig-test/datastream/run_migration_test.rb b/tools/mig-test/datastream/run_migration_test.rb
index deb16b0f074..9b355c1c3dd 100644
--- a/tools/mig-test/datastream/run_migration_test.rb
+++ b/tools/mig-test/datastream/run_migration_test.rb
@@ -32,12 +32,18 @@ def exec_sql_source(sql)
`mysql -h 127.0.0.1 -P#{SOURCE_PORT} -uroot --skip-password -e "USE #{DATABASE_NAME}; #{sql}"`
end
+def extract_job_id(output)
+ current_job_id = output.split("\n").filter { _1.start_with?('Job has been submitted with JobID ') }.first&.split&.last
+ raise StandardError, "Failed to submit Flink job. Output: #{output}" unless current_job_id&.length == 32
+ current_job_id
+end
+
def put_mystery_data(mystery)
exec_sql_source("REPLACE INTO girl(id, name) VALUES (17, '#{mystery}');")
end
def ensure_mystery_data(mystery)
- throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
+ raise StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
end
puts ' Waiting for source to start up...'
@@ -52,8 +58,8 @@ def test_migration_chore(from_version, to_version)
# Clear previous savepoints and logs
`rm -rf savepoints`
- old_job_id = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached datastream-#{from_version}/target/datastream-job-#{from_version}-jar-with-dependencies.jar`.split.last
- raise StandardError, 'Failed to submit Flink job' unless old_job_id.length == 32
+ old_output = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached datastream-#{from_version}/target/datastream-job-#{from_version}.jar`
+ old_job_id = extract_job_id(old_output)
puts "Submitted job at #{from_version} as #{old_job_id}"
@@ -64,8 +70,8 @@ def test_migration_chore(from_version, to_version)
puts `#{FLINK_HOME}/bin/flink stop --savepointPath #{Dir.pwd}/savepoints #{old_job_id}`
savepoint_file = `ls savepoints`.split("\n").last
- new_job_id = `#{FLINK_HOME}/bin/flink run --fromSavepoint #{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached datastream-#{to_version}/target/datastream-job-#{to_version}-jar-with-dependencies.jar`.split.last
- raise StandardError, 'Failed to submit Flink job' unless new_job_id.length == 32
+ new_output = `#{FLINK_HOME}/bin/flink run --fromSavepoint #{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached datastream-#{to_version}/target/datastream-job-#{to_version}.jar`
+ new_job_id = extract_job_id(new_output)
puts "Submitted job at #{to_version} as #{new_job_id}"
random_string_2 = SecureRandom.hex(8)
diff --git a/tools/mig-test/run_migration_test.rb b/tools/mig-test/run_migration_test.rb
index bb111252cbf..4ee9e20d897 100644
--- a/tools/mig-test/run_migration_test.rb
+++ b/tools/mig-test/run_migration_test.rb
@@ -37,7 +37,7 @@ def put_mystery_data(mystery)
end
def ensure_mystery_data(mystery)
- throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
+ raise StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
end
def extract_job_id(output)