diff --git a/core/pom.xml b/core/pom.xml index ea3f3634..546d6d02 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -32,6 +32,7 @@ 2.12.15 2.13.5 2.12 + 5.10.1 @@ -179,6 +180,19 @@ jackson-module-jaxb-annotations ${jackson.version} + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + org.junit.jupiter + junit-jupiter-api + ${junit-jupiter.version} + test + diff --git a/core/raydp-main/pom.xml b/core/raydp-main/pom.xml index fe3a068a..725cb680 100644 --- a/core/raydp-main/pom.xml +++ b/core/raydp-main/pom.xml @@ -165,6 +165,16 @@ ${jackson.version} + + org.junit.jupiter + junit-jupiter + test + + + org.junit.jupiter + junit-jupiter-api + test + diff --git a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala index 3c0d0f12..dce83a78 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala @@ -36,7 +36,7 @@ import org.apache.spark.raydp.SparkOnRayConfigs import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils} import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} import org.apache.spark.util.Utils /** @@ -171,7 +171,7 @@ class RayCoarseGrainedSchedulerBackend( val resourcesInMap = transferResourceRequirements(executorResourceReqs) ++ raydpExecutorCustomResources - val numExecutors = conf.get(config.EXECUTOR_INSTANCES).get + val numExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf); val sparkCoresPerExecutor = coresPerExecutor .getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR) val rayActorCPU = conf.get(SparkOnRayConfigs.SPARK_EXECUTOR_ACTOR_CPU_RESOURCE, diff --git a/core/raydp-main/src/main/test/org/apache/spark/scheduler/cluster/raydp/TestRayCoarseGrainedSchedulerBackend.java b/core/raydp-main/src/main/test/org/apache/spark/scheduler/cluster/raydp/TestRayCoarseGrainedSchedulerBackend.java new file mode 100644 index 00000000..44808733 --- /dev/null +++ b/core/raydp-main/src/main/test/org/apache/spark/scheduler/cluster/raydp/TestRayCoarseGrainedSchedulerBackend.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.raydp; + +import org.apache.spark.SparkConf; +import org.junit.jupiter.api.Test; + +import org.apache.spark.scheduler.cluster.SchedulerBackendUtils; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * This class performs unit testing on some methods in `RayCoarseGrainedSchedulerBackend`. + */ +public class TestRayCoarseGrainedSchedulerBackend { + + // Test using the default value. + @Test + public void testExecutorNumberWithDefaultConfig() { + SparkConf conf = new SparkConf(); + int executorNumber = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf, 2); + assertEquals(2, executorNumber); + } + + // Test using a negative value. + @Test + public void testExecutorNumberWithNegativeConfig() { + SparkConf conf = new SparkConf(); + conf.set("spark.dynamicAllocation.initialExecutors", "-1"); + int executorNumber = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf, 2); + assertEquals(2, executorNumber); + } + + // Test using reasonable values. + @Test + public void testExecutorNumberWithValidConfig() { + SparkConf conf = new SparkConf(); + conf.set("spark.executor.instances", "5"); + int executorNumber = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf, 2); + assertEquals(5, executorNumber); + } + + // Test using dynamic values. + @Test + public void testExecutorNumberWithDynamicConfig() { + SparkConf conf = new SparkConf(); + conf.set("spark.dynamicAllocation.enabled", "true"); + conf.set("spark.dynamicAllocation.minExecutors", "3"); + int executorNumber = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf, 2); + assertEquals(3, executorNumber); + } +}