Skip to content

Commit

Permalink
Resolve NPE Caused by Missing EXECUTOR_INSTANCES in Spark Settings. (#…
Browse files Browse the repository at this point in the history
…422)

* Resolve NPE Caused by Missing EXECUTOR_INSTANCES in Spark Settings.

* Add comment.

* Improve Some Code.

* Fix CheckStyle Issue.

* Fix CheckStyle Issue.

* Add Some Junit Test.

* Add Some Comment.
  • Loading branch information
slfan1989 authored Feb 27, 2025
1 parent 374f858 commit f31fec6
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 2 deletions.
14 changes: 14 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<scala.version>2.12.15</scala.version>
<jackson.version>2.13.5</jackson.version>
<scala.binary.version>2.12</scala.binary.version>
<junit-jupiter.version>5.10.1</junit-jupiter.version>
</properties>

<modules>
Expand Down Expand Up @@ -179,6 +180,19 @@
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
10 changes: 10 additions & 0 deletions core/raydp-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.raydp.SparkOnRayConfigs
import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -171,7 +171,7 @@ class RayCoarseGrainedSchedulerBackend(

val resourcesInMap = transferResourceRequirements(executorResourceReqs) ++
raydpExecutorCustomResources
val numExecutors = conf.get(config.EXECUTOR_INSTANCES).get
val numExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf);
val sparkCoresPerExecutor = coresPerExecutor
.getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR)
val rayActorCPU = conf.get(SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.raydp;

import org.apache.spark.SparkConf;
import org.junit.jupiter.api.Test;

import org.apache.spark.scheduler.cluster.SchedulerBackendUtils;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* This class performs unit testing on some methods in `RayCoarseGrainedSchedulerBackend`.
*/
public class TestRayCoarseGrainedSchedulerBackend {

// Test using the default value.
@Test
public void testExecutorNumberWithDefaultConfig() {
SparkConf conf = new SparkConf();
int executorNumber = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf, 2);
assertEquals(2, executorNumber);
}

// Test using a negative value.
@Test
public void testExecutorNumberWithNegativeConfig() {
SparkConf conf = new SparkConf();
conf.set("spark.dynamicAllocation.initialExecutors", "-1");
int executorNumber = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf, 2);
assertEquals(2, executorNumber);
}

// Test using reasonable values.
@Test
public void testExecutorNumberWithValidConfig() {
SparkConf conf = new SparkConf();
conf.set("spark.executor.instances", "5");
int executorNumber = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf, 2);
assertEquals(5, executorNumber);
}

// Test using dynamic values.
@Test
public void testExecutorNumberWithDynamicConfig() {
SparkConf conf = new SparkConf();
conf.set("spark.dynamicAllocation.enabled", "true");
conf.set("spark.dynamicAllocation.minExecutors", "3");
int executorNumber = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf, 2);
assertEquals(3, executorNumber);
}
}

0 comments on commit f31fec6

Please sign in to comment.