Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Signed-off-by: win5923 <[email protected]>
  • Loading branch information
win5923 committed Feb 17, 2025
1 parent a673e44 commit 73c9099
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 34 deletions.
46 changes: 12 additions & 34 deletions kubectl-plugin/pkg/cmd/job/job_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericiooptions"
Expand Down Expand Up @@ -322,42 +321,21 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor

// Continuously checks for Kubernetes events related to the RayJobDeletionPolicy.
// If an event indicates that the RayJobDeletionPolicy feature gate must be enabled, throw an error and delete the RayJob.
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-time.After(clusterTimeout * time.Second):
return
case <-ticker.C:
eventList, err := k8sClients.KubernetesClient().CoreV1().Events(*options.configFlags.Namespace).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.name=%s", options.RayJob.GetName()),
})
if err != nil {
fmt.Printf("Error listing events: %v\n", err)
return
}
if options.deletionPolicy != "" {
waitCtx, waitCancel := context.WithTimeout(ctx, 30*time.Second)
defer waitCancel()

// Check for error events related to RayJobDeletionPolicy feature gate
for _, event := range eventList.Items {
if strings.Contains(event.Message, "RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature") {
if event.FirstTimestamp.Time.After(startTime) || event.LastTimestamp.Time.After(startTime) {
fmt.Printf("Deleting RayJob...\n")
err = k8sClients.RayClient().RayV1().RayJobs(*options.configFlags.Namespace).Delete(ctx, options.RayJob.GetName(), v1.DeleteOptions{})
if err != nil {
fmt.Printf("Failed to clean up Ray job: %v\n", err)
} else {
fmt.Printf("Cleaned Up RayJob: %s\n", options.RayJob.GetName())
}
log.Fatalf("%s", event.Message)
}
}
}
return
err = k8sClients.WaitRayJobDeletionPolicyEnabled(waitCtx, *options.configFlags.Namespace, options.RayJob.Name, startTime, clusterTimeout)
if err != nil {
fmt.Printf("Deleting RayJob...\n")
deleteErr := k8sClients.RayClient().RayV1().RayJobs(*options.configFlags.Namespace).Delete(ctx, options.RayJob.GetName(), v1.DeleteOptions{})
if deleteErr != nil {
return fmt.Errorf("Failed to clean up Ray job after time out.: %w", deleteErr)
}
fmt.Printf("Cleaned Up RayJob: %s\n", options.RayJob.GetName())
return fmt.Errorf("%w", err)
}
}()
}

if len(options.RayJob.GetName()) > 0 {
// Add timeout?
Expand Down
5 changes: 5 additions & 0 deletions kubectl-plugin/pkg/cmd/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -105,6 +106,10 @@ func (c fakeClient) RayClient() rayclient.Interface {
return nil
}

func (c fakeClient) WaitRayJobDeletionPolicyEnabled(_ context.Context, _, _ string, _ time.Time, _ time.Duration) error {
return nil
}

// Tests the Run() step of the command and checks the output.
func TestRayVersionRun(t *testing.T) {
testContext := "test-context"
Expand Down
42 changes: 42 additions & 0 deletions kubectl-plugin/pkg/util/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"
"fmt"
"strings"
"time"

dockerparser "github.com/novln/docker-parser"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
cmdutil "k8s.io/kubectl/pkg/cmd/util"

Expand All @@ -20,6 +23,7 @@ type Client interface {
// GetRayHeadSvcName retrieves the name of RayHead service for the given RayCluster, RayJob, or RayService.
GetRayHeadSvcName(ctx context.Context, namespace string, resourceType util.ResourceType, name string) (string, error)
GetKubeRayOperatorVersion(ctx context.Context) (string, error)
WaitRayJobDeletionPolicyEnabled(ctx context.Context, namespace, name string, startTime time.Time, timeout time.Duration) error
}

type k8sClient struct {
Expand Down Expand Up @@ -141,3 +145,41 @@ func (c *k8sClient) getRayHeadSvcNameByRayService(ctx context.Context, namespace
svcName := rayService.Status.ActiveServiceStatus.RayClusterStatus.Head.ServiceName
return svcName, nil
}

// WaitRayJobDeletionPolicyEnabled blocks until an event indicates that the RayJobDeletionPolicy feature gate must be enabled.
func (c *k8sClient) WaitRayJobDeletionPolicyEnabled(ctx context.Context, namespace, name string, startTime time.Time, timeout time.Duration) error {
timeoutCtx, cancel := context.WithTimeout(ctx, timeout*time.Second)
defer cancel()

watcher, err := c.KubernetesClient().CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.name=%s", name),
})
if err != nil {
return fmt.Errorf("failed to watch events for RayJob %s in namespace %s: %w", name, namespace, err)
}
defer watcher.Stop()

for {
select {
case <-timeoutCtx.Done():
if timeoutCtx.Err() == context.DeadlineExceeded {
return fmt.Errorf("timed out waiting for RayJobDeletionPolicy feature gate to be enabled")
}
case event := <-watcher.ResultChan():
if event.Type == watch.Error {
return fmt.Errorf("error watching events: %v", event.Object)
}

e, ok := event.Object.(*corev1.Event)
if !ok {
continue
}

if strings.Contains(e.Message, "RayJobDeletionPolicy feature gate must be enabled to use the DeletionPolicy feature") {
if e.FirstTimestamp.Time.After(startTime) || e.LastTimestamp.Time.After(startTime) {
return fmt.Errorf("%s", e.Message)
}
}
}
}
}

0 comments on commit 73c9099

Please sign in to comment.