Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve NPE Caused by Missing EXECUTOR_INSTANCES in Spark Settings. #422

Merged
merged 7 commits into from
Feb 27, 2025

Conversation

slfan1989
Copy link
Contributor

@slfan1989 slfan1989 commented Feb 20, 2025

Backgroud

During our testing, we found that when using RayDP and the spark.executor.instances parameter is not specified, a null pointer exception is thrown.

I believe we should set a default value for this parameter. If the user does not specify it, we should use the default value instead of throwing an exception. This would improve the user experience.

The error message is as follows:

25/02/20 22:58:59 ERROR SparkContext: Error initializing SparkContext.
java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.scheduler.cluster.raydp.RayCoarseGrainedSchedulerBackend.start(RayCoarseGrainedSchedulerBackend.scala:174)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:584)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:745)
25/02/20 22:58:59 INFO AbstractConnector: Stopped Spark@22f7fd5e{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
25/02/20 22:59:00 ERROR Utils: Uncaught exception in thread Thread-4
java.lang.NullPointerException
	at org.apache.spark.scheduler.cluster.raydp.RayCoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$raydp$RayCoarseGrainedSchedulerBackend$$stop(RayCoarseGrainedSchedulerBackend.scala:309)
	at org.apache.spark.scheduler.cluster.raydp.RayCoarseGrainedSchedulerBackend.stop(RayCoarseGrainedSchedulerBackend.scala:197)
	at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:927)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2516)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2089)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1442)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2089)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:680)
....

@slfan1989 slfan1989 marked this pull request as draft February 20, 2025 10:39
@slfan1989
Copy link
Contributor Author

@carsonwang @pang-wu Could you please take a look at this PR? Thank you very much!

@slfan1989 slfan1989 marked this pull request as ready for review February 20, 2025 21:52
@cawangyz
Copy link

can you share the program that can reproduce the issue? spark.executor.instances should be set automatically when you call raydp.init_spark with num_executors passed in.

@slfan1989
Copy link
Contributor Author

can you share the program that can reproduce the issue? spark.executor.instances should be set automatically when you call raydp.init_spark with num_executors passed in.

@cawangyz Thank you for your response! Our use case is that currently, a large number of Ray tasks are primarily used for running XGBoost. However, some data preparation work has not yet been integrated into the RayJob, and we are trying to include these ETL tasks into the RayJob process. These tasks consist of some pre-written scripts, and we would like to directly execute these scripts using RayDpSubmit, similar to how SparkSubmit is used on YARN.

The scripts we submit are similar to this.

raydp-submit --ray-conf /root/ray.conf \
     --conf spark.driver.memory=6g \
     --conf spark.executor.memory=4g \ 
     --conf spark.executor.cores=2 \ 
     --conf spark.executor.instances=3 \
     --files param.json\
     --py-files py_file.zip \
     main.py --parameter_file=param.json

Previously, when we ran the scripts this way, we encountered an exception if the --conf spark.executor.instances parameter was not specified.

To validate this, we can directly use the example case. If we remove --conf spark.executor.instances=1, we will encounter an error like this.

from os.path import dirname
import sys
import json
import subprocess
import ray
import pyspark
ray.init(address="auto")
node = ray.worker.global_worker.node
options = {}
options["ray"] = {}
options["ray"]["run-mode"] = "CLUSTER"
options["ray"]["node-ip"] = node.node_ip_address
options["ray"]["address"] = node.address
options["ray"]["session-dir"] = node.get_session_dir_path()
ray.shutdown()
conf_path = dirname(__file__) + "/ray.conf"
with open(conf_path, "w") as f:
json.dump(options, f)
command = ["bin/raydp-submit", "--ray-conf", conf_path]
command += ["--conf", "spark.executor.cores=1"]
command += ["--conf", "spark.executor.instances=1"]
command += ["--conf", "spark.executor.memory=500m"]
example_path = dirname(pyspark.__file__)
# run SparkPi as example
command.append(example_path + "/examples/src/main/python/pi.py")
sys.exit(subprocess.run(command, check=True).returncode)

From a user's perspective, for newly written code, we would follow the official documentation to create new applications. The examples provided by RayDp are already quite comprehensive. However, for older code, we would prefer to run it directly in a compatible way. In practice, we might forget to set spark.executor.instances in a Spark task, and having a default value in this case would be much better.

@pang-wu
Copy link
Contributor

pang-wu commented Feb 22, 2025

My two cents: I think our behavior should align with what Spark's default is, which should be 2 on YARN?
Also what would happen if the user set spark.dynamicAllocation.initialExecutors? I think we want to check that first: If that value is not set, we set executor instance to raydp's default, otherwise set it to spark.dynamicAllocation.initialExecutors?

@slfan1989
Copy link
Contributor Author

My two cents: I think our behavior should align with what Spark's default is, which should be 2 on YARN?
Also what would happen if the user set spark.dynamicAllocation.initialExecutors? I think we want to check that first: If that value is not set, we set executor instance to raydp's default, otherwise set it to spark.dynamicAllocation.initialExecutors?

@pang-wu Thank you for your suggestion! The default value of 2 is indeed reasonable. Regarding the approach to get the number of executors, I also agree that we should first retrieve the value from the spark.dynamicAllocation.initialExecutors configuration. We can directly use Spark's built-in method to get the number of executors, which will meet our needs. This method will automatically check if dynamic allocation is enabled; if it is, it will fetch the corresponding number of executors, otherwise, it will return the default value.

Copy link
Contributor

@pang-wu pang-wu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good, can we also write a test?

@slfan1989
Copy link
Contributor Author

The change looks good, can we also write a test?

Thank you for your response! I will add a unit test.

@slfan1989
Copy link
Contributor Author

@pang-wu @cawangyz I added some unit tests. Could you review this PR again? Thank you very much!

Copy link
Collaborator

@carsonwang carsonwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Thank you all! Merging this.

@carsonwang carsonwang merged commit f31fec6 into oap-project:master Feb 27, 2025
16 checks passed
@slfan1989
Copy link
Contributor Author

@cawangyz @pang-wu Thanks for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants