Skip to content

Commit

Permalink
Merge pull request #743 from InterestingLab/fixbug-719
Browse files Browse the repository at this point in the history
Fix bug of #719
  • Loading branch information
garyelephant authored Dec 22, 2021
2 parents 54af233 + 55b4a21 commit b27404b
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 105 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ target/
*.iml

.DS_Store
log4j.properties
metastore_db/

work_dir
Expand Down
40 changes: 8 additions & 32 deletions bin/start-seatunnel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
CMD_ARGUMENTS=$@

PARAMS=""
variables_substitution="-Ddefault=seatunnel"

while (( "$#" )); do
case "$1" in
-m|--master)
Expand Down Expand Up @@ -128,7 +130,7 @@ variables_substitution=$(string_trim "${variables_substitution}")

## get spark conf from config file and specify them in spark-submit --conf
function get_spark_conf {
spark_conf=$(java ${variables_substitution} -cp ${assemblyJarName} io.github.interestinglab.waterdrop.config.ExposeSparkConf ${CONFIG_FILE})
spark_conf=$(java ${variables_substitution} -cp ${assemblyJarName} io.github.interestinglab.waterdrop.config.ExposeSparkConf ${CONFIG_FILE} "${variables_substitution}")
if [ "$?" != "0" ]; then
echo "[ERROR] config file does not exists or cannot be parsed due to invalid format"
exit -1
Expand All @@ -140,29 +142,6 @@ sparkConf=$(get_spark_conf)

echo "[INFO] spark conf: ${sparkConf}"

## get spark driver conf from config file and specify them in spark-submit
function get_spark_driver_conf {
spark_conf=$(java ${variables_substitution} -cp ${assemblyJarName} io.github.interestinglab.waterdrop.config.ExposeSparkDriverConf ${CONFIG_FILE})
if [ "$?" != "0" ]; then
echo "[ERROR] config file does not exists or cannot be parsed due to invalid format"
exit -1
fi
echo ${spark_conf}
}

sparkDriverConf=$(get_spark_driver_conf)

# Spark Driver Options
driverJavaOpts=""
executorJavaOpts=""
clientModeDriverJavaOpts=""
if [ ! -z "${variables_substitution}" ]; then
driverJavaOpts="${variables_substitution}"
executorJavaOpts="${variables_substitution}"
# in local, client mode, driverJavaOpts can not work, we must use --driver-java-options
clientModeDriverJavaOpts="${variables_substitution}"
fi


## compress plugins.tar.gz in cluster mode
if [ "${DEPLOY_MODE}" == "cluster" ]; then
Expand All @@ -183,17 +162,14 @@ if [ "${DEPLOY_MODE}" == "cluster" ]; then
fi
fi


exec ${SPARK_HOME}/bin/spark-submit --class io.github.interestinglab.waterdrop.Waterdrop \
CMD=(${SPARK_HOME}/bin/spark-submit --class io.github.interestinglab.waterdrop.Waterdrop \
--name $(getAppName ${CONFIG_FILE}) \
--master ${MASTER} \
--deploy-mode ${DEPLOY_MODE} \
${sparkDriverConf} \
--queue "${QUEUE}" \
--driver-java-options "${clientModeDriverJavaOpts}" \
--conf spark.executor.extraJavaOptions="${executorJavaOpts}" \
--conf spark.driver.extraJavaOptions="${driverJavaOpts}" \
${sparkConf} \
"${sparkConf}" \
${JarDepOpts} \
${FilesDepOpts} \
${assemblyJarName} ${CMD_ARGUMENTS}
${assemblyJarName} ${CMD_ARGUMENTS})

eval "${CMD[@]}"
31 changes: 31 additions & 0 deletions config/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=ERROR, console

# set the log level for these components
log4j.logger.org=ERROR
log4j.logger.org.apache.spark=ERROR
log4j.logger.org.spark-project=ERROR
log4j.logger.org.apache.hadoop=ERROR
log4j.logger.io.netty=ERROR
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.spark.ui.SparkUI=INFO

# add a ConsoleAppender to the logger stdout to write to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
# use a simple message format
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,22 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.net.URISyntaxException;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.Map;

public class ExposeSparkConf {

private static final String spark_driver_extraJavaOptions = "spark.driver.extraJavaOptions";
private static final String spark_executor_extraJavaOptions = "spark.executor.extraJavaOptions";
private static final String spark_driver_prefix = "spark.driver.";

public static void main(String[] args) throws Exception {
Config appConfig = ConfigFactory.parseFile(new File(args[0]))
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true));

String variables = args[1];
StringBuilder stringBuilder = new StringBuilder();
Map<String, String> sparkConfs = new LinkedHashMap<String, String>();
Map<String, String> sparkConfMap = new HashMap<>();
for (Map.Entry<String, ConfigValue> entry : appConfig.getConfig("spark").entrySet()) {
String key = entry.getKey();
String value = entry.getValue().unwrapped().toString();
Expand All @@ -50,37 +51,87 @@ public static void main(String[] args) throws Exception {
String conf = String.format(" --%s %s", argKey, value);
stringBuilder.append(conf);
} else {
String v = sparkConfs.getOrDefault(key, null);
String v = sparkConfMap.getOrDefault(key, null);
if (StringUtils.isBlank(v)) {
sparkConfs.put(key, value);
sparkConfMap.put(key, value);
} else {
sparkConfs.put(key, v + " " + value);
sparkConfMap.put(key, v + " " + value);
}
}
}

for (Map.Entry<String, String> c : sparkConfs.entrySet()) {

if (!sparkConfMap.containsKey(spark_executor_extraJavaOptions)) {
sparkConfMap.put(spark_executor_extraJavaOptions, variables);
} else {
sparkConfMap.put(spark_executor_extraJavaOptions,
sparkConfMap.get(spark_executor_extraJavaOptions) + " " + variables);
}

for (Map.Entry<String, String> c : sparkConfMap.entrySet()) {
String v = addLogPropertiesIfNeeded(c.getKey(), c.getValue());
String conf = String.format(" --conf %s=%s ", c.getKey(), v);
String conf = String.format(" --conf %s=\"%s\"", c.getKey(), v);
stringBuilder.append(conf);
}

if (!sparkConfs.containsKey(spark_driver_extraJavaOptions)) {
stringBuilder.append(" --conf " + spark_driver_extraJavaOptions + "=" + logConfiguration());
String sparkDriverConf = exposeSparkDriverConf(appConfig, variables);

System.out.print(sparkDriverConf + " " + stringBuilder.toString());
}


/**
* In client mode, this config must not be set through the SparkConf directly in your application
* eg. using --driver-java-options instead of spark.driver.extraJavaOptions
* @param appConfig
* @param variables
* @return
*/
private static String exposeSparkDriverConf(Config appConfig, String variables) {
Config sparkConfig = appConfig.getConfig("spark");

Map<String, String> sparkDriverMap = new HashMap<>();
if (TypesafeConfigUtils.hasSubConfig(sparkConfig, spark_driver_prefix)) {

Config sparkDriverConfig = TypesafeConfigUtils.extractSubConfig(sparkConfig, spark_driver_prefix, true);
for (Map.Entry<String, ConfigValue> entry : sparkDriverConfig.entrySet()) {
String key = entry.getKey();
SparkDriverSettings settings = SparkDriverSettings.fromProperty(key);
if (settings != null) {
sparkDriverMap.put(settings.option, entry.getValue().unwrapped().toString());

}
}
}

if (!sparkConfs.containsKey(spark_executor_extraJavaOptions)) {
stringBuilder.append(" --conf " + spark_executor_extraJavaOptions + "=" + logConfiguration());
if (!sparkDriverMap.containsKey(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option)) {
sparkDriverMap.put(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option, variables);
} else {
sparkDriverMap.put(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option,
sparkDriverMap.get(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option) + " " + variables);
}

System.out.print(stringBuilder.toString());
StringBuilder stringBuilder = new StringBuilder();
for (Map.Entry<String, String> c : sparkDriverMap.entrySet()) {
String v = addLogPropertiesIfNeeded(c.getKey(), c.getValue());
String conf = String.format(" %s=\"%s\" ", c.getKey(), v);
stringBuilder.append(conf);
}

return stringBuilder.toString();

}

/**
* if log4j.configuration is not specified, set default file path
* @param key
* @param value
* @return
*/
private static String addLogPropertiesIfNeeded(String key, String value) {
if (!value.contains("-Dlog4j.configuration")) {
if (key.equals(spark_driver_extraJavaOptions)
|| key.equals(spark_executor_extraJavaOptions) ) {

if (key.equals(SparkDriverSettings.DRIVER_JAVA_OPTIONS.option)
|| key.equals(spark_executor_extraJavaOptions)) {
if (!value.contains("-Dlog4j.configuration")) {
return value + " " + logConfiguration();
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
public enum SparkDriverSettings {

DRIVER_MEMORY("spark.driver.memory", "--driver-memory"),

DRIVER_JAVA_OPTIONS("spark.driver.extraJavaOptions", "--driver-java-options"),

DRIVER_LIBRARY_PATH("spark.driver.extraLibraryPath", "--driver-library-path"),

DRIVER_CLASS_PATH("spark.driver.extraClassPath", "--driver-class-path");


Expand Down

0 comments on commit b27404b

Please sign in to comment.