Skip to content

Commit

Permalink
[Refactor][GCS FT] Use DeleteAllOf to delete cluster pods before clea…
Browse files Browse the repository at this point in the history
…ning up redis (#1785)
  • Loading branch information
rueian authored Jan 2, 2024
1 parent af4f6ac commit 2d88d63
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 50 deletions.
80 changes: 30 additions & 50 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,23 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, request ctrl.Reque
return ctrl.Result{}, client.IgnoreNotFound(err)
}

func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, namespace string, filterLabels client.MatchingLabels) (active int, pods corev1.PodList, err error) {
if err = r.List(ctx, &pods, client.InNamespace(namespace), filterLabels); err != nil {
return 0, pods, err
}
active = 0
for _, pod := range pods.Items {
if pod.DeletionTimestamp.IsZero() {
active++
}
}
if active > 0 {
r.Log.Info(fmt.Sprintf("Deleting all pods with labels %v in %q namespace.", filterLabels, namespace))
return active, pods, r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(namespace), filterLabels)
}
return active, pods, nil
}

func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1.RayCluster) (ctrl.Result, error) {
// Please do NOT modify `originalRayClusterInstance` in the following code.
originalRayClusterInstance := instance.DeepCopy()
Expand Down Expand Up @@ -204,49 +221,21 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
"DeletionTimestamp", instance.ObjectMeta.DeletionTimestamp)

// Delete the head Pod if it exists.
headPods := corev1.PodList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
if err := r.List(ctx, &headPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
numDeletedHeads, headPods, err := r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
})
if err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

for _, headPod := range headPods.Items {
if !headPod.DeletionTimestamp.IsZero() {
r.Log.Info(fmt.Sprintf("The head Pod %s is already being deleted. Skip deleting this Pod.", headPod.Name))
continue
}
r.Log.Info(fmt.Sprintf(
"Delete the head Pod %s before the Redis cleanup. "+
"The storage namespace %s in Redis cannot be fully deleted if the GCS process on the head Pod is still writing to it.",
headPod.Name, headPod.Annotations[utils.RayExternalStorageNSAnnotationKey]))
if err := r.Delete(ctx, &headPod); err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
}

// Delete all worker Pods if they exist.
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
workerPods := corev1.PodList{}
filterLabels = client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeGroupLabelKey: workerGroup.GroupName}
if err := r.List(ctx, &workerPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

for _, workerPod := range workerPods.Items {
if !workerPod.DeletionTimestamp.IsZero() {
r.Log.Info(fmt.Sprintf("The worker Pod %s is already being deleted. Skip deleting this Pod.", workerPod.Name))
continue
}
r.Log.Info(fmt.Sprintf(
"Delete the worker Pod %s. This step isn't necessary for initiating the Redis cleanup process.", workerPod.Name))
if err := r.Delete(ctx, &workerPod); err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
}
if _, _, err = r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.WorkerNode),
}); err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

// If the number of head Pods is not 0, wait for it to be terminated before initiating the Redis cleanup process.
if len(headPods.Items) != 0 {
if numDeletedHeads > 0 {
r.Log.Info(fmt.Sprintf(
"Wait for the head Pod %s to be terminated before initiating the Redis cleanup process. "+
"The storage namespace %s in Redis cannot be fully deleted if the GCS process on the head Pod is still writing to it.",
Expand All @@ -256,7 +245,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
}

// We can start the Redis cleanup process now because the head Pod has been terminated.
filterLabels = client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode)}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode)}
redisCleanupJobs := batchv1.JobList{}
if err := r.List(ctx, &redisCleanupJobs, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
Expand Down Expand Up @@ -583,16 +572,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// if RayCluster is suspended, delete all pods and skip reconcile
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
clusterLabel := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name}
allPods := corev1.PodList{}
if err := r.List(ctx, &allPods, client.InNamespace(instance.Namespace), clusterLabel); err != nil {
return err
}

if len(allPods.Items) == 0 {
return nil
}

if err := r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(instance.Namespace), clusterLabel); err != nil {
if _, _, err := r.deleteAllPods(ctx, instance.Namespace, clusterLabel); err != nil {
return err
}

Expand Down
63 changes: 63 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2380,3 +2380,66 @@ func TestSumGPUs(t *testing.T) {
})
}
}

func TestDeleteAllPods(t *testing.T) {
newScheme := runtime.NewScheme()
_ = corev1.AddToScheme(newScheme)
ns := "tmp-ns"
ts := metav1.Now()
filter := map[string]string{"app": "tmp"}

p1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "alive",
Namespace: ns,
Labels: filter,
},
}
p2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "deleted",
Namespace: ns,
Labels: filter,
DeletionTimestamp: &ts,
Finalizers: []string{"tmp"},
},
}
p3 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "other",
Namespace: ns,
Labels: map[string]string{"app": "other"},
},
}

fakeClient := clientFake.NewClientBuilder().
WithScheme(newScheme).
WithRuntimeObjects(p1, p2, p3).
Build()

testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: newScheme,
Log: ctrl.Log.WithName("controllers"),
}
ctx := context.Background()
// The first `deleteAllPods` function call should delete the "alive" Pod.
active, pods, err := testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
assert.Nil(t, err)
assert.Equal(t, 1, active)
assert.Equal(t, 2, len(pods.Items))
assert.Subset(t, []string{"alive", "deleted"}, []string{pods.Items[0].Name, pods.Items[1].Name})
// The second `deleteAllPods` function call should delete no Pods because none are active.
active, pods, err = testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
assert.Nil(t, err)
assert.Equal(t, 0, active)
assert.Equal(t, 1, len(pods.Items))
assert.Equal(t, "deleted", pods.Items[0].Name)
// Make sure that the above `deleteAllPods` calls didn't remove other Pods.
pods = corev1.PodList{}
err = fakeClient.List(ctx, &pods, client.InNamespace(ns))
assert.Nil(t, err)
assert.Equal(t, 2, len(pods.Items))
assert.Subset(t, []string{"deleted", "other"}, []string{pods.Items[0].Name, pods.Items[1].Name})
}

0 comments on commit 2d88d63

Please sign in to comment.