Skip to content

Commit 50c690d

Browse files
mccheahash211
authored andcommitted
Support spark.executor.extraJavaOptions.
(cherry picked from commit d0444b9)
1 parent 34d7af2 commit 50c690d

File tree

5 files changed

+59
-16
lines changed

5 files changed

+59
-16
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
457457
.withValue(cp)
458458
.build()
459459
}
460-
val requiredEnv = (Seq(
460+
val executorExtraJavaOptionsEnv = conf
461+
.get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
462+
.map { opts =>
463+
val delimitedOpts = Utils.splitCommandString(opts)
464+
delimitedOpts.zipWithIndex.map {
465+
case (opt, index) =>
466+
new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
467+
}
468+
}.getOrElse(Seq.empty[EnvVar])
469+
val executorEnv = (Seq(
461470
(ENV_EXECUTOR_PORT, executorPort.toString),
462471
(ENV_DRIVER_URL, driverUrl),
463472
// Executor backend expects integral value for executor cores, so round it up to an int.
@@ -477,7 +486,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
477486
.withNewFieldRef("v1", "status.podIP")
478487
.build())
479488
.build()
480-
)
489+
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
481490
val requiredPorts = Seq(
482491
(EXECUTOR_PORT_NAME, executorPort),
483492
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
@@ -497,8 +506,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
497506
.addToLimits("memory", executorMemoryLimitQuantity)
498507
.addToRequests("cpu", executorCpuQuantity)
499508
.endResources()
500-
.addAllToEnv(requiredEnv.asJava)
501-
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
509+
.addAllToEnv(executorEnv.asJava)
502510
.withPorts(requiredPorts.asJava)
503511
.build()
504512

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ ENV PYSPARK_PYTHON python
3838
ENV PYSPARK_DRIVER_PYTHON python
3939
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
4040

41-
# TODO support spark.executor.extraClassPath
4241
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
42+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
43+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
4344
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4445
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4546
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
46-
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
47+
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ FROM spark-base
2323

2424
COPY examples /opt/spark/examples
2525

26-
# TODO support spark.executor.extraClassPath
2726
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
27+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
28+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
2829
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
2930
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
3031
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
3132
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
32-
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
33+
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,51 @@ private[spark] object JavaOptionsTest {
2929

3030
def main(args: Array[String]): Unit = {
3131
// scalastyle:off println
32-
if (args.length != 1) {
32+
if (args.length != 2) {
3333
println(s"Invalid arguments: ${args.mkString(",")}." +
34-
s"Usage: JavaOptionsTest <driver-java-options-list-file>")
34+
s"Usage: JavaOptionsTest <driver-java-options-list-file> <executor-java-options-list-file>")
3535
System.exit(1)
3636
}
3737
val expectedDriverJavaOptions = loadPropertiesFromFile(args(0))
38+
val expectedExecutorJavaOptions = loadPropertiesFromFile(args(1))
3839
val nonMatchingDriverOptions = expectedDriverJavaOptions.filter {
3940
case (optKey, optValue) => System.getProperty(optKey) != optValue
4041
}
4142
if (nonMatchingDriverOptions.nonEmpty) {
4243
println(s"The driver's JVM options did not match. Expected $expectedDriverJavaOptions." +
4344
s" But these options did not match: $nonMatchingDriverOptions.")
4445
val sysProps = Maps.fromProperties(System.getProperties).asScala
45-
println("System properties are:")
46+
println("Driver system properties are:")
4647
for (prop <- sysProps) {
4748
println(s"Key: ${prop._1}, Value: ${prop._2}")
4849
}
4950
System.exit(1)
5051
}
5152

52-
// TODO support spark.executor.extraJavaOptions and test here.
53-
println(s"All expected JVM options were present on the driver and executors.")
53+
val spark = SparkSession.builder().getOrCreate().sparkContext
54+
try {
55+
val nonMatchingExecutorOptions = spark.parallelize(Seq(0)).flatMap { _ =>
56+
expectedExecutorJavaOptions.filter {
57+
case (optKey, optValue) => System.getProperty(optKey) != optValue
58+
}
59+
}.collectAsMap()
60+
if (nonMatchingExecutorOptions.nonEmpty) {
61+
val executorSysProps = spark.parallelize(Seq(0)).flatMap { _ =>
62+
Maps.fromProperties(System.getProperties).asScala
63+
}.collectAsMap()
64+
println(s"The executor's JVM options did not match. Expected" +
65+
s" $expectedExecutorJavaOptions. But these options did not" +
66+
s" match: $nonMatchingExecutorOptions.")
67+
println("Executor system properties are:")
68+
for (prop <- executorSysProps) {
69+
println(s"Key: ${prop._1}, Value: ${prop._2}")
70+
}
71+
} else {
72+
println("All expected JVM options were present on the driver and executors.")
73+
}
74+
} finally {
75+
spark.stop()
76+
}
5477
// scalastyle:on println
5578
}
5679

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,18 +231,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
231231
launchStagingServer(SSLOptions(), None)
232232
val driverJvmOptionsFile = storeJvmOptionsInTempFile(
233233
Map("simpleDriverConf" -> "simpleDriverConfValue",
234-
"driverconfwithspaces" -> "driver conf with spaces value"),
234+
"driverconfwithspaces" -> "driver conf with spaces value"),
235235
"driver-jvm-options.properties",
236236
"JVM options that should be set on the driver.")
237+
val executorJvmOptionsFile = storeJvmOptionsInTempFile(
238+
Map("simpleExecutorConf" -> "simpleExecutorConfValue",
239+
"executor conf with spaces" -> "executor conf with spaces value"),
240+
"executor-jvm-options.properties",
241+
"JVM options that should be set on the executors.")
237242
sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
238243
"-DsimpleDriverConf=simpleDriverConfValue" +
239244
" -Ddriverconfwithspaces='driver conf with spaces value'")
240-
sparkConf.set("spark.files", driverJvmOptionsFile.getAbsolutePath)
245+
sparkConf.set(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
246+
"-DsimpleExecutorConf=simpleExecutorConfValue" +
247+
" -D\'executor conf with spaces\'=\'executor conf with spaces value\'")
248+
sparkConf.set("spark.files",
249+
Seq(driverJvmOptionsFile.getAbsolutePath, executorJvmOptionsFile.getAbsolutePath)
250+
.mkString(","))
241251
runSparkApplicationAndVerifyCompletion(
242252
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
243253
JAVA_OPTIONS_MAIN_CLASS,
244254
Seq(s"All expected JVM options were present on the driver and executors."),
245-
Array(driverJvmOptionsFile.getName),
255+
Array(driverJvmOptionsFile.getName, executorJvmOptionsFile.getName),
246256
Seq.empty[String])
247257
}
248258

0 commit comments

Comments
 (0)