@@ -663,26 +663,31 @@ def run_job_with_managed_cluster(
663
663
raise ValueError ("job_config.entrypoint must be specified." )
664
664
665
665
# Warn if Pydantic V1/V2 specific fields in RayJobSpec are set, as they are not used for RayJob CR.
666
- if job_config .entrypoint_num_cpus is not None or \
667
- job_config .entrypoint_num_gpus is not None or \
668
- job_config .entrypoint_memory is not None :
666
+ if (
667
+ job_config .entrypoint_num_cpus is not None
668
+ or job_config .entrypoint_num_gpus is not None
669
+ or job_config .entrypoint_memory is not None
670
+ ):
669
671
warnings .warn (
670
672
"RayJobSpec fields 'entrypoint_num_cpus', 'entrypoint_num_gpus', 'entrypoint_memory' "
671
673
"are not directly used when creating a RayJob CR. They are primarily for the Ray Job Submission Client. "
672
674
"Resource requests for the job driver pod should be configured in the RayCluster head node spec via ClusterConfiguration." ,
673
- UserWarning
675
+ UserWarning ,
674
676
)
675
677
676
678
# Generate rayClusterSpec from ClusterConfiguration
677
679
temp_config_for_spec = copy .deepcopy (cluster_config )
678
680
temp_config_for_spec .appwrapper = False
679
-
681
+
680
682
with warnings .catch_warnings ():
681
683
warnings .simplefilter ("ignore" , UserWarning )
682
684
dummy_cluster_for_spec = Cluster (temp_config_for_spec )
683
685
684
686
ray_cluster_cr_dict = dummy_cluster_for_spec .resource_yaml
685
- if not isinstance (ray_cluster_cr_dict , dict ) or "spec" not in ray_cluster_cr_dict :
687
+ if (
688
+ not isinstance (ray_cluster_cr_dict , dict )
689
+ or "spec" not in ray_cluster_cr_dict
690
+ ):
686
691
raise ValueError (
687
692
"Failed to generate RayCluster CR dictionary from ClusterConfiguration. "
688
693
f"Got: { type (ray_cluster_cr_dict )} "
@@ -691,13 +696,15 @@ def run_job_with_managed_cluster(
691
696
692
697
# Prepare RayJob CR
693
698
actual_job_cr_name = job_cr_name or f"rayjob-{ uuid .uuid4 ().hex [:10 ]} "
694
-
699
+
695
700
runtime_env_yaml_str = ""
696
701
if job_config .runtime_env :
697
702
try :
698
703
runtime_env_yaml_str = yaml .dump (job_config .runtime_env )
699
704
except yaml .YAMLError as e :
700
- raise ValueError (f"Invalid job_config.runtime_env, failed to dump to YAML: { e } " )
705
+ raise ValueError (
706
+ f"Invalid job_config.runtime_env, failed to dump to YAML: { e } "
707
+ )
701
708
702
709
ray_job_cr_spec = {
703
710
"entrypoint" : job_config .entrypoint ,
@@ -735,7 +742,9 @@ def run_job_with_managed_cluster(
735
742
ray_cluster_name_actual = None
736
743
737
744
try :
738
- print (f"Submitting RayJob '{ actual_job_cr_name } ' to namespace '{ namespace } '..." )
745
+ print (
746
+ f"Submitting RayJob '{ actual_job_cr_name } ' to namespace '{ namespace } '..."
747
+ )
739
748
k8s_co_api .create_namespaced_custom_object (
740
749
group = "ray.io" ,
741
750
version = "v1" ,
@@ -750,27 +759,37 @@ def run_job_with_managed_cluster(
750
759
start_time = time .time ()
751
760
while True :
752
761
try :
753
- ray_job_status_cr = k8s_co_api .get_namespaced_custom_object_status (
754
- group = "ray.io" ,
755
- version = "v1" ,
756
- namespace = namespace ,
757
- plural = "rayjobs" ,
758
- name = actual_job_cr_name ,
762
+ ray_job_status_cr = (
763
+ k8s_co_api .get_namespaced_custom_object_status (
764
+ group = "ray.io" ,
765
+ version = "v1" ,
766
+ namespace = namespace ,
767
+ plural = "rayjobs" ,
768
+ name = actual_job_cr_name ,
769
+ )
759
770
)
760
771
except ApiException as e :
761
772
if e .status == 404 :
762
- print (f"RayJob '{ actual_job_cr_name } ' status not found yet, retrying..." )
773
+ print (
774
+ f"RayJob '{ actual_job_cr_name } ' status not found yet, retrying..."
775
+ )
763
776
time .sleep (job_polling_interval_seconds )
764
777
continue
765
778
raise
766
779
767
780
status_field = ray_job_status_cr .get ("status" , {})
768
- job_deployment_status = status_field .get ("jobDeploymentStatus" , "UNKNOWN" )
781
+ job_deployment_status = status_field .get (
782
+ "jobDeploymentStatus" , "UNKNOWN"
783
+ )
769
784
current_job_status = status_field .get ("jobStatus" , "PENDING" )
770
-
785
+
771
786
dashboard_url = status_field .get ("dashboardURL" , dashboard_url )
772
- ray_cluster_name_actual = status_field .get ("rayClusterName" , ray_cluster_name_actual )
773
- returned_job_submission_id = status_field .get ("jobId" , job_config .submission_id )
787
+ ray_cluster_name_actual = status_field .get (
788
+ "rayClusterName" , ray_cluster_name_actual
789
+ )
790
+ returned_job_submission_id = status_field .get (
791
+ "jobId" , job_config .submission_id
792
+ )
774
793
775
794
final_job_status = current_job_status
776
795
print (
@@ -779,41 +798,72 @@ def run_job_with_managed_cluster(
779
798
780
799
if current_job_status in ["SUCCEEDED" , "FAILED" , "STOPPED" ]:
781
800
break
782
-
783
- if job_timeout_seconds and (time .time () - start_time ) > job_timeout_seconds :
801
+
802
+ if (
803
+ job_timeout_seconds
804
+ and (time .time () - start_time ) > job_timeout_seconds
805
+ ):
784
806
try :
785
- ray_job_status_cr_final = k8s_co_api .get_namespaced_custom_object_status (
786
- group = "ray.io" , version = "v1" , namespace = namespace , plural = "rayjobs" , name = actual_job_cr_name
807
+ ray_job_status_cr_final = (
808
+ k8s_co_api .get_namespaced_custom_object_status (
809
+ group = "ray.io" ,
810
+ version = "v1" ,
811
+ namespace = namespace ,
812
+ plural = "rayjobs" ,
813
+ name = actual_job_cr_name ,
814
+ )
815
+ )
816
+ status_field_final = ray_job_status_cr_final .get (
817
+ "status" , {}
818
+ )
819
+ final_job_status = status_field_final .get (
820
+ "jobStatus" , final_job_status
821
+ )
822
+ returned_job_submission_id = status_field_final .get (
823
+ "jobId" , returned_job_submission_id
824
+ )
825
+ dashboard_url = status_field_final .get (
826
+ "dashboardURL" , dashboard_url
827
+ )
828
+ ray_cluster_name_actual = status_field_final .get (
829
+ "rayClusterName" , ray_cluster_name_actual
787
830
)
788
- status_field_final = ray_job_status_cr_final .get ("status" , {})
789
- final_job_status = status_field_final .get ("jobStatus" , final_job_status )
790
- returned_job_submission_id = status_field_final .get ("jobId" , returned_job_submission_id )
791
- dashboard_url = status_field_final .get ("dashboardURL" , dashboard_url )
792
- ray_cluster_name_actual = status_field_final .get ("rayClusterName" , ray_cluster_name_actual )
793
831
except Exception :
794
832
pass
795
833
raise TimeoutError (
796
834
f"RayJob '{ actual_job_cr_name } ' timed out after { job_timeout_seconds } seconds. Last status: { final_job_status } "
797
835
)
798
836
799
837
time .sleep (job_polling_interval_seconds )
800
-
801
- print (f"RayJob '{ actual_job_cr_name } ' finished with status: { final_job_status } " )
838
+
839
+ print (
840
+ f"RayJob '{ actual_job_cr_name } ' finished with status: { final_job_status } "
841
+ )
802
842
else :
803
843
try :
804
844
ray_job_status_cr = k8s_co_api .get_namespaced_custom_object_status (
805
- group = "ray.io" , version = "v1" , namespace = namespace , plural = "rayjobs" , name = actual_job_cr_name
845
+ group = "ray.io" ,
846
+ version = "v1" ,
847
+ namespace = namespace ,
848
+ plural = "rayjobs" ,
849
+ name = actual_job_cr_name ,
806
850
)
807
851
status_field = ray_job_status_cr .get ("status" , {})
808
852
final_job_status = status_field .get ("jobStatus" , "SUBMITTED" )
809
- returned_job_submission_id = status_field .get ("jobId" , job_config .submission_id )
853
+ returned_job_submission_id = status_field .get (
854
+ "jobId" , job_config .submission_id
855
+ )
810
856
dashboard_url = status_field .get ("dashboardURL" , dashboard_url )
811
- ray_cluster_name_actual = status_field .get ("rayClusterName" , ray_cluster_name_actual )
857
+ ray_cluster_name_actual = status_field .get (
858
+ "rayClusterName" , ray_cluster_name_actual
859
+ )
812
860
except ApiException as e :
813
861
if e .status == 404 :
814
862
final_job_status = "SUBMITTED_NOT_FOUND"
815
863
else :
816
- print (f"Warning: Could not fetch initial status for RayJob '{ actual_job_cr_name } ': { e } " )
864
+ print (
865
+ f"Warning: Could not fetch initial status for RayJob '{ actual_job_cr_name } ': { e } "
866
+ )
817
867
final_job_status = "UNKNOWN_API_ERROR"
818
868
819
869
return {
@@ -825,20 +875,30 @@ def run_job_with_managed_cluster(
825
875
}
826
876
827
877
except ApiException as e :
828
- print (f"Kubernetes API error during RayJob '{ actual_job_cr_name } ' management: { e .reason } (status: { e .status } )" )
878
+ print (
879
+ f"Kubernetes API error during RayJob '{ actual_job_cr_name } ' management: { e .reason } (status: { e .status } )"
880
+ )
829
881
final_status_on_error = "ERROR_BEFORE_SUBMISSION"
830
882
if actual_job_cr_name :
831
883
try :
832
884
ray_job_status_cr = k8s_co_api .get_namespaced_custom_object_status (
833
- group = "ray.io" , version = "v1" , namespace = namespace , plural = "rayjobs" , name = actual_job_cr_name
885
+ group = "ray.io" ,
886
+ version = "v1" ,
887
+ namespace = namespace ,
888
+ plural = "rayjobs" ,
889
+ name = actual_job_cr_name ,
834
890
)
835
891
status_field = ray_job_status_cr .get ("status" , {})
836
- final_status_on_error = status_field .get ("jobStatus" , "UNKNOWN_AFTER_K8S_ERROR" )
892
+ final_status_on_error = status_field .get (
893
+ "jobStatus" , "UNKNOWN_AFTER_K8S_ERROR"
894
+ )
837
895
except Exception :
838
896
final_status_on_error = "UNKNOWN_FINAL_STATUS_FETCH_FAILED"
839
897
raise
840
898
except Exception as e :
841
- print (f"An unexpected error occurred during managed RayJob execution for '{ actual_job_cr_name } ': { e } " )
899
+ print (
900
+ f"An unexpected error occurred during managed RayJob execution for '{ actual_job_cr_name } ': { e } "
901
+ )
842
902
raise
843
903
844
904
@@ -999,8 +1059,10 @@ def get_cluster(
999
1059
)
1000
1060
# 1. Prepare RayClusterSpec from ClusterConfiguration
1001
1061
# Create a temporary config with appwrapper=False to ensure build_ray_cluster returns RayCluster YAML
1002
- temp_cluster_config_dict = cluster_config .dict (exclude_none = True ) # Assuming Pydantic V1 or similar .dict() method
1003
- temp_cluster_config_dict ['appwrapper' ] = False
1062
+ temp_cluster_config_dict = cluster_config .dict (
1063
+ exclude_none = True
1064
+ ) # Assuming Pydantic V1 or similar .dict() method
1065
+ temp_cluster_config_dict ["appwrapper" ] = False
1004
1066
temp_cluster_config_for_spec = ClusterConfiguration (** temp_cluster_config_dict )
1005
1067
# Ignore the warning here for the lack of a ClusterConfiguration
1006
1068
with warnings .catch_warnings ():
0 commit comments