diff --git a/.gitignore b/.gitignore index c403a7ccc42..b0c5eb9a921 100644 --- a/.gitignore +++ b/.gitignore @@ -21,7 +21,6 @@ target/ *.iml .DS_Store -log4j.properties metastore_db/ work_dir diff --git a/bin/start-seatunnel.sh b/bin/start-seatunnel.sh index 8f13b0305fd..68bdd0c8867 100755 --- a/bin/start-seatunnel.sh +++ b/bin/start-seatunnel.sh @@ -21,6 +21,8 @@ CMD_ARGUMENTS=$@ PARAMS="" +variables_substitution="-Ddefault=seatunnel" + while (( "$#" )); do case "$1" in -m|--master) @@ -128,7 +130,7 @@ variables_substitution=$(string_trim "${variables_substitution}") ## get spark conf from config file and specify them in spark-submit --conf function get_spark_conf { - spark_conf=$(java ${variables_substitution} -cp ${assemblyJarName} io.github.interestinglab.waterdrop.config.ExposeSparkConf ${CONFIG_FILE}) + spark_conf=$(java ${variables_substitution} -cp ${assemblyJarName} io.github.interestinglab.waterdrop.config.ExposeSparkConf ${CONFIG_FILE} "${variables_substitution}") if [ "$?" != "0" ]; then echo "[ERROR] config file does not exists or cannot be parsed due to invalid format" exit -1 @@ -140,29 +142,6 @@ sparkConf=$(get_spark_conf) echo "[INFO] spark conf: ${sparkConf}" -## get spark driver conf from config file and specify them in spark-submit -function get_spark_driver_conf { - spark_conf=$(java ${variables_substitution} -cp ${assemblyJarName} io.github.interestinglab.waterdrop.config.ExposeSparkDriverConf ${CONFIG_FILE}) - if [ "$?" != "0" ]; then - echo "[ERROR] config file does not exists or cannot be parsed due to invalid format" - exit -1 - fi - echo ${spark_conf} -} - -sparkDriverConf=$(get_spark_driver_conf) - -# Spark Driver Options -driverJavaOpts="" -executorJavaOpts="" -clientModeDriverJavaOpts="" -if [ ! -z "${variables_substitution}" ]; then - driverJavaOpts="${variables_substitution}" - executorJavaOpts="${variables_substitution}" - # in local, client mode, driverJavaOpts can not work, we must use --driver-java-options - clientModeDriverJavaOpts="${variables_substitution}" -fi - ## compress plugins.tar.gz in cluster mode if [ "${DEPLOY_MODE}" == "cluster" ]; then @@ -183,17 +162,14 @@ if [ "${DEPLOY_MODE}" == "cluster" ]; then fi fi - -exec ${SPARK_HOME}/bin/spark-submit --class io.github.interestinglab.waterdrop.Waterdrop \ +CMD=(${SPARK_HOME}/bin/spark-submit --class io.github.interestinglab.waterdrop.Waterdrop \ --name $(getAppName ${CONFIG_FILE}) \ --master ${MASTER} \ --deploy-mode ${DEPLOY_MODE} \ - ${sparkDriverConf} \ --queue "${QUEUE}" \ - --driver-java-options "${clientModeDriverJavaOpts}" \ - --conf spark.executor.extraJavaOptions="${executorJavaOpts}" \ - --conf spark.driver.extraJavaOptions="${driverJavaOpts}" \ - ${sparkConf} \ + "${sparkConf}" \ ${JarDepOpts} \ ${FilesDepOpts} \ - ${assemblyJarName} ${CMD_ARGUMENTS} + ${assemblyJarName} ${CMD_ARGUMENTS}) + +eval "${CMD[@]}" diff --git a/config/log4j.properties b/config/log4j.properties new file mode 100644 index 00000000000..c6eaa574f63 --- /dev/null +++ b/config/log4j.properties @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=ERROR, console + +# set the log level for these components +log4j.logger.org=ERROR +log4j.logger.org.apache.spark=ERROR +log4j.logger.org.spark-project=ERROR +log4j.logger.org.apache.hadoop=ERROR +log4j.logger.io.netty=ERROR +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.spark.ui.SparkUI=INFO + +# add a ConsoleAppender to the logger stdout to write to the console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +# use a simple message format +log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/ExposeSparkConf.java b/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/ExposeSparkConf.java index 69eceb373f3..31194b46809 100644 --- a/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/ExposeSparkConf.java +++ b/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/ExposeSparkConf.java @@ -27,21 +27,22 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.net.URISyntaxException; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.Map; public class ExposeSparkConf { - private static final String spark_driver_extraJavaOptions = "spark.driver.extraJavaOptions"; private static final String spark_executor_extraJavaOptions = "spark.executor.extraJavaOptions"; + private static final String spark_driver_prefix = "spark.driver."; public static void main(String[] args) throws Exception { Config appConfig = ConfigFactory.parseFile(new File(args[0])) .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true)) .resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true)); + String variables = args[1]; StringBuilder stringBuilder = new StringBuilder(); - Map sparkConfs = new LinkedHashMap(); + Map sparkConfMap = new HashMap<>(); for (Map.Entry entry : appConfig.getConfig("spark").entrySet()) { String key = entry.getKey(); String value = entry.getValue().unwrapped().toString(); @@ -50,37 +51,87 @@ public static void main(String[] args) throws Exception { String conf = String.format(" --%s %s", argKey, value); stringBuilder.append(conf); } else { - String v = sparkConfs.getOrDefault(key, null); + String v = sparkConfMap.getOrDefault(key, null); if (StringUtils.isBlank(v)) { - sparkConfs.put(key, value); + sparkConfMap.put(key, value); } else { - sparkConfs.put(key, v + " " + value); + sparkConfMap.put(key, v + " " + value); } } } - for (Map.Entry c : sparkConfs.entrySet()) { + + if (!sparkConfMap.containsKey(spark_executor_extraJavaOptions)) { + sparkConfMap.put(spark_executor_extraJavaOptions, variables); + } else { + sparkConfMap.put(spark_executor_extraJavaOptions, + sparkConfMap.get(spark_executor_extraJavaOptions) + " " + variables); + } + + for (Map.Entry c : sparkConfMap.entrySet()) { String v = addLogPropertiesIfNeeded(c.getKey(), c.getValue()); - String conf = String.format(" --conf %s=%s ", c.getKey(), v); + String conf = String.format(" --conf %s=\"%s\"", c.getKey(), v); stringBuilder.append(conf); } - if (!sparkConfs.containsKey(spark_driver_extraJavaOptions)) { - stringBuilder.append(" --conf " + spark_driver_extraJavaOptions + "=" + logConfiguration()); + String sparkDriverConf = exposeSparkDriverConf(appConfig, variables); + + System.out.print(sparkDriverConf + " " + stringBuilder.toString()); + } + + + /** + * In client mode, this config must not be set through the SparkConf directly in your application + * eg. using --driver-java-options instead of spark.driver.extraJavaOptions + * @param appConfig + * @param variables + * @return + */ + private static String exposeSparkDriverConf(Config appConfig, String variables) { + Config sparkConfig = appConfig.getConfig("spark"); + + Map sparkDriverMap = new HashMap<>(); + if (TypesafeConfigUtils.hasSubConfig(sparkConfig, spark_driver_prefix)) { + + Config sparkDriverConfig = TypesafeConfigUtils.extractSubConfig(sparkConfig, spark_driver_prefix, true); + for (Map.Entry entry : sparkDriverConfig.entrySet()) { + String key = entry.getKey(); + SparkDriverSettings settings = SparkDriverSettings.fromProperty(key); + if (settings != null) { + sparkDriverMap.put(settings.option, entry.getValue().unwrapped().toString()); + + } + } } - if (!sparkConfs.containsKey(spark_executor_extraJavaOptions)) { - stringBuilder.append(" --conf " + spark_executor_extraJavaOptions + "=" + logConfiguration()); + if (!sparkDriverMap.containsKey(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option)) { + sparkDriverMap.put(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option, variables); + } else { + sparkDriverMap.put(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option, + sparkDriverMap.get(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option) + " " + variables); } - System.out.print(stringBuilder.toString()); + StringBuilder stringBuilder = new StringBuilder(); + for (Map.Entry c : sparkDriverMap.entrySet()) { + String v = addLogPropertiesIfNeeded(c.getKey(), c.getValue()); + String conf = String.format(" %s=\"%s\" ", c.getKey(), v); + stringBuilder.append(conf); + } + + return stringBuilder.toString(); + } + /** + * if log4j.configuration is not specified, set default file path + * @param key + * @param value + * @return + */ private static String addLogPropertiesIfNeeded(String key, String value) { - if (!value.contains("-Dlog4j.configuration")) { - if (key.equals(spark_driver_extraJavaOptions) - || key.equals(spark_executor_extraJavaOptions) ) { - + if (key.equals(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option) + || key.equals(spark_executor_extraJavaOptions)) { + if (!value.contains("-Dlog4j.configuration")) { return value + " " + logConfiguration(); } } diff --git a/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/ExposeSparkDriverConf.java b/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/ExposeSparkDriverConf.java deleted file mode 100644 index 7d98a774a9f..00000000000 --- a/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/ExposeSparkDriverConf.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package io.github.interestinglab.waterdrop.config; - - -import java.util.Map; -import java.io.File; - - -public class ExposeSparkDriverConf { - - - public static void main(String[] args) throws Exception { - Config appConfig = ConfigFactory.parseFile(new File(args[0])) - .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true)) - .resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true)); - - String driverPrefix = "spark.driver."; - Config sparkConfig = appConfig.getConfig("spark"); - - if (!TypesafeConfigUtils.hasSubConfig(sparkConfig, driverPrefix)) { - System.out.println(""); - } else { - Config sparkDriverConfig = TypesafeConfigUtils.extractSubConfig(sparkConfig, driverPrefix, true); - StringBuilder stringBuilder = new StringBuilder(); - for (Map.Entry entry : sparkDriverConfig.entrySet()) { - String key = entry.getKey(); - SparkDriverSettings settings = SparkDriverSettings.fromProperty(key); - if (settings != null) { - String conf = String.format(" %s=%s ", settings.option, entry.getValue().unwrapped()); - stringBuilder.append(conf); - } - } - - System.out.println(stringBuilder.toString()); - } - } -} diff --git a/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/SparkDriverSettings.java b/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/SparkDriverSettings.java index 23bc6609886..93238e3eda3 100644 --- a/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/SparkDriverSettings.java +++ b/waterdrop-core/src/main/java/io/github/interestinglab/waterdrop/config/SparkDriverSettings.java @@ -24,8 +24,11 @@ public enum SparkDriverSettings { DRIVER_MEMORY("spark.driver.memory", "--driver-memory"), + DRIVER_JAVA_OPTIONS("spark.driver.extraJavaOptions", "--driver-java-options"), + DRIVER_LIBRARY_PATH("spark.driver.extraLibraryPath", "--driver-library-path"), + DRIVER_CLASS_PATH("spark.driver.extraClassPath", "--driver-class-path");