Skip to content

Commit 8cec9fa

Browse files
committed
Refactor
Signed-off-by: win5923 <[email protected]>
1 parent e9cf5d3 commit 8cec9fa

File tree

3 files changed

+57
-40
lines changed

3 files changed

+57
-40
lines changed

kubectl-plugin/pkg/cmd/job/job_submit.go

Lines changed: 14 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"time"
1515

1616
"k8s.io/apimachinery/pkg/api/meta"
17-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1817
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1918
"k8s.io/cli-runtime/pkg/genericclioptions"
2019
"k8s.io/cli-runtime/pkg/genericiooptions"
@@ -34,9 +33,10 @@ import (
3433
)
3534

3635
const (
37-
dashboardAddr = "http://localhost:8265"
38-
clusterTimeout = 120.0
39-
portforwardtimeout = 60.0
36+
dashboardAddr = "http://localhost:8265"
37+
clusterTimeout = 120.0
38+
portforwardtimeout = 60.0
39+
rayjobDeletionTimeout = 30.0
4040
)
4141

4242
type SubmitJobOptions struct {
@@ -336,44 +336,18 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
336336
}
337337
fmt.Printf("Submitted RayJob %s.\n", options.RayJob.GetName())
338338

339-
// Continuously checks for Kubernetes events related to the RayJobDeletionPolicy.
340-
// If an event indicates that the RayJobDeletionPolicy feature gate must be enabled, throw an error and delete the RayJob.
341-
go func() {
342-
ticker := time.NewTicker(5 * time.Second)
343-
defer ticker.Stop()
344-
345-
for {
346-
select {
347-
case <-time.After(clusterTimeout * time.Second):
348-
return
349-
case <-ticker.C:
350-
eventList, err := k8sClients.KubernetesClient().CoreV1().Events(*options.configFlags.Namespace).List(ctx, metav1.ListOptions{
351-
FieldSelector: fmt.Sprintf("involvedObject.name=%s", options.RayJob.GetName()),
352-
})
353-
if err != nil {
354-
fmt.Printf("Error listing events: %v\n", err)
355-
return
356-
}
357-
358-
// Check for error events related to RayJobDeletionPolicy feature gate
359-
for _, event := range eventList.Items {
360-
if strings.Contains(event.Message, "RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature") {
361-
if event.FirstTimestamp.Time.After(startTime) || event.LastTimestamp.Time.After(startTime) {
362-
fmt.Printf("Deleting RayJob...\n")
363-
err = k8sClients.RayClient().RayV1().RayJobs(*options.configFlags.Namespace).Delete(ctx, options.RayJob.GetName(), v1.DeleteOptions{})
364-
if err != nil {
365-
fmt.Printf("Failed to clean up Ray job: %v\n", err)
366-
} else {
367-
fmt.Printf("Cleaned Up RayJob: %s\n", options.RayJob.GetName())
368-
}
369-
log.Fatalf("%s", event.Message)
370-
}
371-
}
372-
}
373-
return
339+
if options.deletionPolicy != "" {
340+
err = k8sClients.WaitRayJobDeletionPolicyEnabled(ctx, *options.configFlags.Namespace, options.RayJob.Name, startTime, rayjobDeletionTimeout)
341+
if err != nil {
342+
fmt.Printf("Deleting RayJob...\n")
343+
deleteErr := k8sClients.RayClient().RayV1().RayJobs(*options.configFlags.Namespace).Delete(ctx, options.RayJob.GetName(), v1.DeleteOptions{})
344+
if deleteErr != nil {
345+
return fmt.Errorf("Failed to clean up Ray job after time out.: %w", deleteErr)
374346
}
347+
fmt.Printf("Cleaned Up RayJob: %s\n", options.RayJob.GetName())
348+
return fmt.Errorf("%w", err)
375349
}
376-
}()
350+
}
377351

378352
if len(options.RayJob.GetName()) > 0 {
379353
// Add timeout?

kubectl-plugin/pkg/cmd/version/version_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ func (c fakeClient) RayClient() rayclient.Interface {
110110
return nil
111111
}
112112

113+
func (c fakeClient) WaitRayJobDeletionPolicyEnabled(_ context.Context, _, _ string, _ time.Time, _ time.Duration) error {
114+
return nil
115+
}
116+
113117
// Tests the Run() step of the command and checks the output.
114118
func TestRayVersionRun(t *testing.T) {
115119
testContext := "test-context"

kubectl-plugin/pkg/util/client/client.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
dockerparser "github.com/novln/docker-parser"
1010
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
11+
corev1 "k8s.io/api/core/v1"
1112
"k8s.io/apimachinery/pkg/api/meta"
1213
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1314
"k8s.io/apimachinery/pkg/fields"
@@ -25,6 +26,7 @@ type Client interface {
2526
// GetRayHeadSvcName retrieves the name of RayHead service for the given RayCluster, RayJob, or RayService.
2627
GetRayHeadSvcName(ctx context.Context, namespace string, resourceType util.ResourceType, name string) (string, error)
2728
GetKubeRayOperatorVersion(ctx context.Context) (string, error)
29+
WaitRayJobDeletionPolicyEnabled(ctx context.Context, namespace, name string, startTime time.Time, timeout time.Duration) error
2830
WaitRayClusterProvisioned(ctx context.Context, namespace, name string, timeout time.Duration) error
2931
}
3032

@@ -185,3 +187,40 @@ func (c *k8sClient) getRayHeadSvcNameByRayService(ctx context.Context, namespace
185187
svcName := rayService.Status.ActiveServiceStatus.RayClusterStatus.Head.ServiceName
186188
return svcName, nil
187189
}
190+
191+
// WaitRayJobDeletionPolicyEnabled blocks until an event indicates that the RayJobDeletionPolicy feature gate must be enabled.
192+
func (c *k8sClient) WaitRayJobDeletionPolicyEnabled(ctx context.Context, namespace, name string, startTime time.Time, timeout time.Duration) error {
193+
timeoutCtx, cancel := context.WithTimeout(ctx, timeout*time.Second)
194+
defer cancel()
195+
196+
watcher, err := c.KubernetesClient().CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{
197+
FieldSelector: fmt.Sprintf("involvedObject.name=%s", name),
198+
})
199+
if err != nil {
200+
return fmt.Errorf("failed to watch events for RayJob %s in namespace %s: %w", name, namespace, err)
201+
}
202+
defer watcher.Stop()
203+
204+
for {
205+
select {
206+
case <-timeoutCtx.Done():
207+
// If the RayJobDeletionPolicy feature event is not received within the timeout period, it is considered enabled.
208+
return nil
209+
case event := <-watcher.ResultChan():
210+
if event.Type == watch.Error {
211+
return fmt.Errorf("error watching events: %v", event.Object)
212+
}
213+
214+
e, ok := event.Object.(*corev1.Event)
215+
if !ok {
216+
continue
217+
}
218+
219+
if strings.Contains(e.Message, "RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature") {
220+
if e.FirstTimestamp.Time.After(startTime) || e.LastTimestamp.Time.After(startTime) {
221+
return fmt.Errorf("%s", e.Message)
222+
}
223+
}
224+
}
225+
}
226+
}

0 commit comments

Comments
 (0)