Skip to content

Commit

Permalink
Merge pull request apache#246 from palantir/resync-kube
Browse files Browse the repository at this point in the history
[NOSQUASH] Resync from apache-spark-on-k8s upstream
  • Loading branch information
ash211 authored Aug 21, 2017
2 parents 692e6f8 + 50c690d commit 701bd2a
Show file tree
Hide file tree
Showing 34 changed files with 1,202 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,9 @@ object SparkSubmit extends CommandLineUtils {
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
childArgs ++= Array("--other-py-files", args.pyFiles)
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
} else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
Expand Down
16 changes: 16 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,22 @@ from the other deployment modes. See the [configuration page](configuration.html
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
<tr>
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
the Executor process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
the Driver process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
</table>


Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2358,7 +2358,7 @@
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkTestSuite.txt</filereports>
<argLine>-ea -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize} ${extraScalaTestArgs}</argLine>
<argLine>-ea -Xmx3g -XX:ReservedCodeCacheSize=${CodeCacheSize} ${extraScalaTestArgs}</argLine>
<stderr/>
<environmentVariables>
<!--
Expand Down
2 changes: 2 additions & 0 deletions resource-managers/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ build/mvn integration-test \
# Running against an arbitrary cluster

In order to run against any cluster, use the following:
```sh
build/mvn integration-test \
-Pkubernetes -Pkubernetes-integration-tests \
-pl resource-managers/kubernetes/integration-tests -am
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master> -Dspark.docker.test.driverImage=<driver-image> -Dspark.docker.test.executorImage=<executor-image>"
```

# Preserve the Minikube VM

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
layout: global
title: Kubernetes Implementation of the External Shuffle Service
---

# External Shuffle Service

The `KubernetesExternalShuffleService` was added to allow Spark to use Dynamic Allocation Mode when
running in Kubernetes. The shuffle service is responsible for persisting shuffle files beyond the
lifetime of the executors, allowing the number of executors to scale up and down without losing
computation.

The implementation of choice is as a DaemonSet that runs a shuffle-service pod on each node.
Shuffle-service pods and executors pods that land on the same node share disk using hostpath
volumes. Spark requires that each executor must know the IP address of the shuffle-service pod that
shares disk with it.

The user specifies the shuffle service pods they want executors of a particular SparkJob to use
through two new properties:

* spark.kubernetes.shuffle.service.labels
* spark.kubernetes.shuffle.namespace

KubernetesClusterSchedulerBackend is aware of shuffle service pods and the node corresponding to
them in a particular namespace. It uses this data to configure the executor pods to connect with the
shuffle services that are co-located with them on the same node.

There is additional logic in the `KubernetesExternalShuffleService` to watch the Kubernetes API,
detect failures, and proactively cleanup files in those error cases.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
layout: global
title: Kubernetes Implementation of the Spark Scheduler Backend
---

# Scheduler Backend

The general idea is to run Spark drivers and executors inside Kubernetes [Pods](https://kubernetes.io/docs/concepts/workloads/pods/pod/).
Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The main component is KubernetesClusterSchedulerBackend,
an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API.
There are auxiliary and optional components: `ResourceStagingServer` and `KubernetesExternalShuffleService`, which serve specific purposes described further below.

The scheduler backend is invoked in the driver associated with a particular job. The driver may run outside the cluster (client mode) or within (cluster mode).
The scheduler backend manages [pods](http://kubernetes.io/docs/user-guide/pods/) for each executor.
The executor code is running within a Kubernetes pod, but remains unmodified and unaware of the orchestration layer.
When a job is running, the scheduler backend configures and creates executor pods with the following properties:

- The pod's container runs a pre-built Docker image containing a Spark distribution (with Kubernetes integration) and
invokes the Java runtime with the CoarseGrainedExecutorBackend main class.
- The scheduler backend specifies environment variables on the executor pod to configure its runtime, p
articularly for its JVM options, number of cores, heap size, and the driver's hostname.
- The executor container has [resource limits and requests](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container)
that are set in accordance to the resource limits specified in the Spark configuration (executor.cores and executor.memory in the application's SparkConf)
- The executor pods may also be launched into a particular [Kubernetes namespace](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/%5C),
or target a particular subset of nodes in the Kubernetes cluster, based on the Spark configuration supplied.

## Requesting Executors

Spark requests for new executors through the `doRequestTotalExecutors(numExecutors: Int)` method.
The scheduler backend keeps track of the request made by Spark core for the number of executors.

A separate kubernetes-pod-allocator thread handles the creation of new executor pods with appropriate throttling and monitoring.
This indirection is required because the Kubernetes API Server accepts requests for new executor pods optimistically, with the
anticipation of being able to eventually run them. However, it is undesirable to have a very large number of pods that cannot be
scheduled and stay pending within the cluster. Hence, the kubernetes-pod-allocator uses the Kubernetes API to make a decision to
submit new requests for executors based on whether previous pod creation requests have completed. This gives us control over how
fast a job scales up (which can be configured), and helps prevent Spark jobs from DOS-ing the Kubernetes API server with pod creation requests.

## Destroying Executors

Spark requests deletion of executors through the `doKillExecutors(executorIds: List[String])`
method.

The inverse behavior is required in the implementation of doKillExecutors(). When the executor
allocation manager desires to remove executors from the application, the scheduler should find the
pods that are running the appropriate executors, and tell the API server to stop these pods.
It's worth noting that this code does not have to decide on the executors that should be
removed. When `doKillExecutors()` is called, the executors that are to be removed have already been
selected by the CoarseGrainedSchedulerBackend and ExecutorAllocationManager.
Loading

0 comments on commit 701bd2a

Please sign in to comment.