From 8056010db981140d19229c96483dbd841af16eff Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 6 Feb 2025 10:16:06 +0800 Subject: [PATCH] mig-test: fix datastream migration steps failure --- tools/mig-test/datastream/compile_jobs.rb | 2 +- .../datastream/datastream-3.2.0/pom.xml | 92 +++++++++++++++++-- .../src/main/java/DataStreamJob.java | 8 +- .../datastream/datastream-3.2.1/pom.xml | 90 ++++++++++++++++-- .../src/main/java/DataStreamJob.java | 8 +- .../datastream/datastream-3.3.0/pom.xml | 92 +++++++++++++++++-- .../src/main/java/DataStreamJob.java | 8 +- .../datastream-3.4-SNAPSHOT/pom.xml | 92 +++++++++++++++++-- .../src/main/java/DataStreamJob.java | 8 +- .../mig-test/datastream/run_migration_test.rb | 16 +++- tools/mig-test/run_migration_test.rb | 2 +- 11 files changed, 356 insertions(+), 62 deletions(-) diff --git a/tools/mig-test/datastream/compile_jobs.rb b/tools/mig-test/datastream/compile_jobs.rb index 5c906e5bf31..2326a454059 100644 --- a/tools/mig-test/datastream/compile_jobs.rb +++ b/tools/mig-test/datastream/compile_jobs.rb @@ -20,7 +20,7 @@ JOB_VERSIONS.each do |version| puts "Compiling DataStream job for CDC #{version}" - `cd datastream-#{version} && mvn clean package -DskipTests` + system "cd datastream-#{version} && mvn clean package -DskipTests" end puts 'Done' \ No newline at end of file diff --git a/tools/mig-test/datastream/datastream-3.2.0/pom.xml b/tools/mig-test/datastream/datastream-3.2.0/pom.xml index c1f556033d1..9eb0212f977 100644 --- a/tools/mig-test/datastream/datastream-3.2.0/pom.xml +++ b/tools/mig-test/datastream/datastream-3.2.0/pom.xml @@ -29,7 +29,7 @@ limitations under the License. UTF-8 1.18.1 3.2.0 - 1.9.7.Final + 1.9.8.Final 2.12 2.0.13 UTF-8 @@ -136,20 +136,96 @@ limitations under the License. - maven-assembly-plugin + org.apache.maven.plugins + maven-shade-plugin + shade-flink package - single + shade + + + false + false + true + + ${project.basedir}/target/dependency-reduced-pom.xml + + + + *:* + + module-info.class + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + io.debezium:debezium-api + io.debezium:debezium-embedded + io.debezium:debezium-core + io.debezium:debezium-ddl-parser + io.debezium:debezium-connector-mysql + org.apache.flink:flink-connector-debezium + org.apache.flink:flink-connector-mysql-cdc + org.antlr:antlr4-runtime + org.apache.kafka:* + mysql:mysql-connector-java + com.zendesk:mysql-binlog-connector-java + com.fasterxml.*:* + com.google.guava:* + com.esri.geometry:esri-geometry-api + com.zaxxer:HikariCP + + org.apache.flink:flink-shaded-guava + + + + + org.apache.kafka + + org.apache.flink.cdc.connectors.shaded.org.apache.kafka + + + + org.antlr + + org.apache.flink.cdc.connectors.shaded.org.antlr + + + + com.fasterxml + + org.apache.flink.cdc.connectors.shaded.com.fasterxml + + + + com.google + + org.apache.flink.cdc.connectors.shaded.com.google + + + + com.esri.geometry + org.apache.flink.cdc.connectors.shaded.com.esri.geometry + + + com.zaxxer + + org.apache.flink.cdc.connectors.shaded.com.zaxxer + + + + - - - jar-with-dependencies - - diff --git a/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java index f821ac0a2de..bfaa2d529bf 100644 --- a/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java +++ b/tools/mig-test/datastream/datastream-3.2.0/src/main/java/DataStreamJob.java @@ -23,7 +23,7 @@ public class DataStreamJob { - public static void main(String[] args) { + public static void main(String[] args) throws Exception { MySqlSource mySqlSource = MySqlSource.builder() .hostname("localhost") .port(3306) @@ -45,10 +45,6 @@ public static void main(String[] args) { .print() .setParallelism(1); - try { - env.execute(); - } catch (Exception e) { - // ... unfortunately - } + env.execute(); } } diff --git a/tools/mig-test/datastream/datastream-3.2.1/pom.xml b/tools/mig-test/datastream/datastream-3.2.1/pom.xml index c7d680a2f3b..b3e2117720b 100644 --- a/tools/mig-test/datastream/datastream-3.2.1/pom.xml +++ b/tools/mig-test/datastream/datastream-3.2.1/pom.xml @@ -136,20 +136,96 @@ limitations under the License. - maven-assembly-plugin + org.apache.maven.plugins + maven-shade-plugin + shade-flink package - single + shade + + + false + false + true + + ${project.basedir}/target/dependency-reduced-pom.xml + + + + *:* + + module-info.class + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + io.debezium:debezium-api + io.debezium:debezium-embedded + io.debezium:debezium-core + io.debezium:debezium-ddl-parser + io.debezium:debezium-connector-mysql + org.apache.flink:flink-connector-debezium + org.apache.flink:flink-connector-mysql-cdc + org.antlr:antlr4-runtime + org.apache.kafka:* + mysql:mysql-connector-java + com.zendesk:mysql-binlog-connector-java + com.fasterxml.*:* + com.google.guava:* + com.esri.geometry:esri-geometry-api + com.zaxxer:HikariCP + + org.apache.flink:flink-shaded-guava + + + + + org.apache.kafka + + org.apache.flink.cdc.connectors.shaded.org.apache.kafka + + + + org.antlr + + org.apache.flink.cdc.connectors.shaded.org.antlr + + + + com.fasterxml + + org.apache.flink.cdc.connectors.shaded.com.fasterxml + + + + com.google + + org.apache.flink.cdc.connectors.shaded.com.google + + + + com.esri.geometry + org.apache.flink.cdc.connectors.shaded.com.esri.geometry + + + com.zaxxer + + org.apache.flink.cdc.connectors.shaded.com.zaxxer + + + + - - - jar-with-dependencies - - diff --git a/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java index f821ac0a2de..bfaa2d529bf 100644 --- a/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java +++ b/tools/mig-test/datastream/datastream-3.2.1/src/main/java/DataStreamJob.java @@ -23,7 +23,7 @@ public class DataStreamJob { - public static void main(String[] args) { + public static void main(String[] args) throws Exception { MySqlSource mySqlSource = MySqlSource.builder() .hostname("localhost") .port(3306) @@ -45,10 +45,6 @@ public static void main(String[] args) { .print() .setParallelism(1); - try { - env.execute(); - } catch (Exception e) { - // ... unfortunately - } + env.execute(); } } diff --git a/tools/mig-test/datastream/datastream-3.3.0/pom.xml b/tools/mig-test/datastream/datastream-3.3.0/pom.xml index cc65c6c7869..e3ba1b846d9 100644 --- a/tools/mig-test/datastream/datastream-3.3.0/pom.xml +++ b/tools/mig-test/datastream/datastream-3.3.0/pom.xml @@ -29,7 +29,7 @@ limitations under the License. UTF-8 1.19.1 3.3.0 - 1.9.7.Final + 1.9.8.Final 2.12 2.0.13 UTF-8 @@ -136,20 +136,96 @@ limitations under the License. - maven-assembly-plugin + org.apache.maven.plugins + maven-shade-plugin + shade-flink package - single + shade + + + false + false + true + + ${project.basedir}/target/dependency-reduced-pom.xml + + + + *:* + + module-info.class + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + io.debezium:debezium-api + io.debezium:debezium-embedded + io.debezium:debezium-core + io.debezium:debezium-ddl-parser + io.debezium:debezium-connector-mysql + org.apache.flink:flink-connector-debezium + org.apache.flink:flink-connector-mysql-cdc + org.antlr:antlr4-runtime + org.apache.kafka:* + mysql:mysql-connector-java + com.zendesk:mysql-binlog-connector-java + com.fasterxml.*:* + com.google.guava:* + com.esri.geometry:esri-geometry-api + com.zaxxer:HikariCP + + org.apache.flink:flink-shaded-guava + + + + + org.apache.kafka + + org.apache.flink.cdc.connectors.shaded.org.apache.kafka + + + + org.antlr + + org.apache.flink.cdc.connectors.shaded.org.antlr + + + + com.fasterxml + + org.apache.flink.cdc.connectors.shaded.com.fasterxml + + + + com.google + + org.apache.flink.cdc.connectors.shaded.com.google + + + + com.esri.geometry + org.apache.flink.cdc.connectors.shaded.com.esri.geometry + + + com.zaxxer + + org.apache.flink.cdc.connectors.shaded.com.zaxxer + + + + - - - jar-with-dependencies - - diff --git a/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java index f821ac0a2de..bfaa2d529bf 100644 --- a/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java +++ b/tools/mig-test/datastream/datastream-3.3.0/src/main/java/DataStreamJob.java @@ -23,7 +23,7 @@ public class DataStreamJob { - public static void main(String[] args) { + public static void main(String[] args) throws Exception { MySqlSource mySqlSource = MySqlSource.builder() .hostname("localhost") .port(3306) @@ -45,10 +45,6 @@ public static void main(String[] args) { .print() .setParallelism(1); - try { - env.execute(); - } catch (Exception e) { - // ... unfortunately - } + env.execute(); } } diff --git a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml index e174d5583cf..f83f6804295 100644 --- a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml +++ b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/pom.xml @@ -29,7 +29,7 @@ limitations under the License. UTF-8 1.19.1 3.4-SNAPSHOT - 1.9.7.Final + 1.9.8.Final 2.12 2.0.13 UTF-8 @@ -136,20 +136,96 @@ limitations under the License. - maven-assembly-plugin + org.apache.maven.plugins + maven-shade-plugin + shade-flink package - single + shade + + + false + false + true + + ${project.basedir}/target/dependency-reduced-pom.xml + + + + *:* + + module-info.class + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + io.debezium:debezium-api + io.debezium:debezium-embedded + io.debezium:debezium-core + io.debezium:debezium-ddl-parser + io.debezium:debezium-connector-mysql + org.apache.flink:flink-connector-debezium + org.apache.flink:flink-connector-mysql-cdc + org.antlr:antlr4-runtime + org.apache.kafka:* + mysql:mysql-connector-java + com.zendesk:mysql-binlog-connector-java + com.fasterxml.*:* + com.google.guava:* + com.esri.geometry:esri-geometry-api + com.zaxxer:HikariCP + + org.apache.flink:flink-shaded-guava + + + + + org.apache.kafka + + org.apache.flink.cdc.connectors.shaded.org.apache.kafka + + + + org.antlr + + org.apache.flink.cdc.connectors.shaded.org.antlr + + + + com.fasterxml + + org.apache.flink.cdc.connectors.shaded.com.fasterxml + + + + com.google + + org.apache.flink.cdc.connectors.shaded.com.google + + + + com.esri.geometry + org.apache.flink.cdc.connectors.shaded.com.esri.geometry + + + com.zaxxer + + org.apache.flink.cdc.connectors.shaded.com.zaxxer + + + + - - - jar-with-dependencies - - diff --git a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java index f821ac0a2de..bfaa2d529bf 100644 --- a/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java +++ b/tools/mig-test/datastream/datastream-3.4-SNAPSHOT/src/main/java/DataStreamJob.java @@ -23,7 +23,7 @@ public class DataStreamJob { - public static void main(String[] args) { + public static void main(String[] args) throws Exception { MySqlSource mySqlSource = MySqlSource.builder() .hostname("localhost") .port(3306) @@ -45,10 +45,6 @@ public static void main(String[] args) { .print() .setParallelism(1); - try { - env.execute(); - } catch (Exception e) { - // ... unfortunately - } + env.execute(); } } diff --git a/tools/mig-test/datastream/run_migration_test.rb b/tools/mig-test/datastream/run_migration_test.rb index deb16b0f074..9b355c1c3dd 100644 --- a/tools/mig-test/datastream/run_migration_test.rb +++ b/tools/mig-test/datastream/run_migration_test.rb @@ -32,12 +32,18 @@ def exec_sql_source(sql) `mysql -h 127.0.0.1 -P#{SOURCE_PORT} -uroot --skip-password -e "USE #{DATABASE_NAME}; #{sql}"` end +def extract_job_id(output) + current_job_id = output.split("\n").filter { _1.start_with?('Job has been submitted with JobID ') }.first&.split&.last + raise StandardError, "Failed to submit Flink job. Output: #{output}" unless current_job_id&.length == 32 + current_job_id +end + def put_mystery_data(mystery) exec_sql_source("REPLACE INTO girl(id, name) VALUES (17, '#{mystery}');") end def ensure_mystery_data(mystery) - throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery + raise StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery end puts ' Waiting for source to start up...' @@ -52,8 +58,8 @@ def test_migration_chore(from_version, to_version) # Clear previous savepoints and logs `rm -rf savepoints` - old_job_id = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached datastream-#{from_version}/target/datastream-job-#{from_version}-jar-with-dependencies.jar`.split.last - raise StandardError, 'Failed to submit Flink job' unless old_job_id.length == 32 + old_output = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached datastream-#{from_version}/target/datastream-job-#{from_version}.jar` + old_job_id = extract_job_id(old_output) puts "Submitted job at #{from_version} as #{old_job_id}" @@ -64,8 +70,8 @@ def test_migration_chore(from_version, to_version) puts `#{FLINK_HOME}/bin/flink stop --savepointPath #{Dir.pwd}/savepoints #{old_job_id}` savepoint_file = `ls savepoints`.split("\n").last - new_job_id = `#{FLINK_HOME}/bin/flink run --fromSavepoint #{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached datastream-#{to_version}/target/datastream-job-#{to_version}-jar-with-dependencies.jar`.split.last - raise StandardError, 'Failed to submit Flink job' unless new_job_id.length == 32 + new_output = `#{FLINK_HOME}/bin/flink run --fromSavepoint #{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached datastream-#{to_version}/target/datastream-job-#{to_version}.jar` + new_job_id = extract_job_id(new_output) puts "Submitted job at #{to_version} as #{new_job_id}" random_string_2 = SecureRandom.hex(8) diff --git a/tools/mig-test/run_migration_test.rb b/tools/mig-test/run_migration_test.rb index bb111252cbf..4ee9e20d897 100644 --- a/tools/mig-test/run_migration_test.rb +++ b/tools/mig-test/run_migration_test.rb @@ -37,7 +37,7 @@ def put_mystery_data(mystery) end def ensure_mystery_data(mystery) - throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery + raise StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery end def extract_job_id(output)