Skip to content

Commit 37ce35b

Browse files
authored
Detect missing Component and transition to Failed state (#141)
1 parent a54cad2 commit 37ce35b

File tree

3 files changed

+72
-9
lines changed

3 files changed

+72
-9
lines changed

internal/controller/appwrapper/appwrapper_controller.go

+51-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
v1 "k8s.io/api/core/v1"
26+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2627
"k8s.io/apimachinery/pkg/api/meta"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829
"k8s.io/apimachinery/pkg/runtime"
@@ -61,6 +62,11 @@ type podStatusSummary struct {
6162
failed int32
6263
}
6364

65+
type componentStatusSummary struct {
66+
expected int32
67+
deployed int32
68+
}
69+
6470
// permission to fully control appwrappers
6571
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers,verbs=get;list;watch;create;update;patch;delete
6672
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/status,verbs=get;update;patch
@@ -202,7 +208,26 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
202208
if aw.Spec.Suspend {
203209
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspending) // begin undeployment
204210
}
205-
podStatus, err := r.workloadStatus(ctx, aw)
211+
212+
// First, check the Component-level status of the workload
213+
compStatus, err := r.getComponentStatus(ctx, aw)
214+
if err != nil {
215+
return ctrl.Result{}, err
216+
}
217+
218+
// Detect externally deleted components and transition to Failed with no GracePeriod or retry
219+
if compStatus.deployed != compStatus.expected {
220+
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
221+
Type: string(workloadv1beta2.Unhealthy),
222+
Status: metav1.ConditionTrue,
223+
Reason: "MissingComponent",
224+
Message: fmt.Sprintf("Only found %v deployed components, but was expecting %v", compStatus.deployed, compStatus.expected),
225+
})
226+
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed)
227+
}
228+
229+
// Second, check the Pod-level status of the workload
230+
podStatus, err := r.getPodStatus(ctx, aw)
206231
if err != nil {
207232
return ctrl.Result{}, err
208233
}
@@ -430,7 +455,7 @@ func (r *AppWrapperReconciler) resetOrFail(ctx context.Context, aw *workloadv1be
430455
}
431456
}
432457

433-
func (r *AppWrapperReconciler) workloadStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper) (*podStatusSummary, error) {
458+
func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper) (*podStatusSummary, error) {
434459
pods := &v1.PodList{}
435460
if err := r.List(ctx, pods,
436461
client.InNamespace(aw.Namespace),
@@ -455,6 +480,30 @@ func (r *AppWrapperReconciler) workloadStatus(ctx context.Context, aw *workloadv
455480
return summary, nil
456481
}
457482

483+
func (r *AppWrapperReconciler) getComponentStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper) (*componentStatusSummary, error) {
484+
summary := &componentStatusSummary{expected: int32(len(aw.Status.ComponentStatus))}
485+
486+
for componentIdx := range aw.Status.ComponentStatus {
487+
cs := &aw.Status.ComponentStatus[componentIdx]
488+
obj := &metav1.PartialObjectMetadata{TypeMeta: metav1.TypeMeta{Kind: cs.Kind, APIVersion: cs.APIVersion}}
489+
if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil {
490+
summary.deployed += 1
491+
} else {
492+
if apierrors.IsNotFound(err) {
493+
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
494+
Type: string(workloadv1beta2.Unhealthy),
495+
Status: metav1.ConditionTrue,
496+
Reason: "ComponentNotFound",
497+
})
498+
} else {
499+
return nil, err
500+
}
501+
}
502+
}
503+
504+
return summary, nil
505+
}
506+
458507
func (r *AppWrapperReconciler) limitDuration(desired time.Duration) time.Duration {
459508
if desired < 0 {
460509
return 0 * time.Second

internal/controller/appwrapper/appwrapper_controller_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ var _ = Describe("AppWrapper Controller", func() {
104104
Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.PodsReady))).Should(BeFalse())
105105
Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue())
106106
Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse())
107-
podStatus, err := awReconciler.workloadStatus(ctx, aw)
107+
podStatus, err := awReconciler.getPodStatus(ctx, aw)
108108
Expect(err).NotTo(HaveOccurred())
109109
Expect(podStatus.pending).Should(Equal(utils.ExpectedPodCount(aw)))
110110

@@ -121,7 +121,7 @@ var _ = Describe("AppWrapper Controller", func() {
121121
Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.PodsReady))).Should(BeFalse())
122122
Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue())
123123
Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse())
124-
podStatus, err = awReconciler.workloadStatus(ctx, aw)
124+
podStatus, err = awReconciler.getPodStatus(ctx, aw)
125125
Expect(err).NotTo(HaveOccurred())
126126
Expect(podStatus.pending).Should(Equal(utils.ExpectedPodCount(aw) - 1))
127127
}
@@ -142,7 +142,7 @@ var _ = Describe("AppWrapper Controller", func() {
142142
Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue())
143143
Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse())
144144
Expect((*workload.AppWrapper)(aw).PodsReady()).Should(BeTrue())
145-
podStatus, err := awReconciler.workloadStatus(ctx, aw)
145+
podStatus, err := awReconciler.getPodStatus(ctx, aw)
146146
Expect(err).NotTo(HaveOccurred())
147147
Expect(podStatus.running).Should(Equal(utils.ExpectedPodCount(aw)))
148148
_, finished := (*workload.AppWrapper)(aw).Finished()
@@ -161,7 +161,7 @@ var _ = Describe("AppWrapper Controller", func() {
161161
_, err = awReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: awName}) // see deletion has completed
162162
Expect(err).NotTo(HaveOccurred())
163163

164-
podStatus, err := awReconciler.workloadStatus(ctx, aw)
164+
podStatus, err := awReconciler.getPodStatus(ctx, aw)
165165
Expect(err).NotTo(HaveOccurred())
166166
Expect(podStatus.failed + podStatus.succeeded + podStatus.running + podStatus.pending).Should(Equal(int32(0)))
167167
})
@@ -184,7 +184,7 @@ var _ = Describe("AppWrapper Controller", func() {
184184
Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.QuotaReserved))).Should(BeTrue())
185185
Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue())
186186
Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse())
187-
podStatus, err := awReconciler.workloadStatus(ctx, aw)
187+
podStatus, err := awReconciler.getPodStatus(ctx, aw)
188188
Expect(err).NotTo(HaveOccurred())
189189
Expect(podStatus.running).Should(Equal(utils.ExpectedPodCount(aw) - 1))
190190
Expect(podStatus.succeeded).Should(Equal(int32(1)))
@@ -239,7 +239,7 @@ var _ = Describe("AppWrapper Controller", func() {
239239
Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.QuotaReserved))).Should(BeFalse())
240240
Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeFalse())
241241
Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeTrue())
242-
podStatus, err := awReconciler.workloadStatus(ctx, aw)
242+
podStatus, err := awReconciler.getPodStatus(ctx, aw)
243243
Expect(err).NotTo(HaveOccurred())
244244
Expect(podStatus.failed + podStatus.succeeded + podStatus.running + podStatus.pending).Should(Equal(int32(0)))
245245
})
@@ -296,7 +296,7 @@ var _ = Describe("AppWrapper Controller", func() {
296296
Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.PodsReady))).Should(BeFalse())
297297
Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue())
298298
Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse())
299-
podStatus, err := awReconciler.workloadStatus(ctx, aw)
299+
podStatus, err := awReconciler.getPodStatus(ctx, aw)
300300
Expect(err).NotTo(HaveOccurred())
301301
Expect(podStatus.pending).Should(Equal(int32(1)))
302302
})

test/e2e/appwrapper_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
. "github.com/onsi/ginkgo/v2"
2626
. "github.com/onsi/gomega"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/runtime"
2829
"k8s.io/apimachinery/pkg/types"
2930
"k8s.io/apimachinery/pkg/util/wait"
@@ -276,6 +277,19 @@ var _ = Describe("AppWrapper E2E Test", func() {
276277
Expect(waitAWPodsReady(ctx, aw)).Should(Succeed())
277278
Eventually(AppWrapperPhase(ctx, aw), 90*time.Second).Should(Equal(workloadv1beta2.AppWrapperFailed))
278279
})
280+
281+
It("Deleting a Running Component yields a failed AppWrapper", func() {
282+
aw := createAppWrapper(ctx, pytorchjob(2, 500))
283+
appwrappers = append(appwrappers, aw)
284+
Eventually(AppWrapperPhase(ctx, aw), 60*time.Second).Should(Equal(workloadv1beta2.AppWrapperRunning))
285+
aw = getAppWrapper(ctx, types.NamespacedName{Name: aw.Name, Namespace: aw.Namespace})
286+
toDelete := &metav1.PartialObjectMetadata{
287+
TypeMeta: metav1.TypeMeta{Kind: aw.Status.ComponentStatus[0].Kind, APIVersion: aw.Status.ComponentStatus[0].APIVersion},
288+
ObjectMeta: metav1.ObjectMeta{Name: aw.Status.ComponentStatus[0].Name, Namespace: aw.Namespace},
289+
}
290+
Expect(getClient(ctx).Delete(ctx, toDelete)).Should(Succeed())
291+
Eventually(AppWrapperPhase(ctx, aw), 60*time.Second).Should(Equal(workloadv1beta2.AppWrapperFailed))
292+
})
279293
})
280294

281295
Describe("Load Testing", Label("slow"), Label("Kueue", "Standalone"), func() {

0 commit comments

Comments
 (0)