Skip to content

Commit 462f97e

Browse files
authored
Implement retries on failure (#64)
1 parent bde2de5 commit 462f97e

File tree

13 files changed

+286
-32
lines changed

13 files changed

+286
-32
lines changed

api/v1beta2/appwrapper_types.go

+12
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ type AppWrapperStatus struct {
6666
// Phase of the AppWrapper object
6767
Phase AppWrapperPhase `json:"phase,omitempty"`
6868

69+
// Retries counts the number of times the AppWrapper has entered the Resetting Phase
70+
Retries int32 `json:"resettingCount,omitempty"`
71+
6972
// Conditions
7073
Conditions []metav1.Condition `json:"conditions,omitempty"`
7174
}
@@ -78,6 +81,7 @@ const (
7881
AppWrapperSuspended AppWrapperPhase = "Suspended"
7982
AppWrapperResuming AppWrapperPhase = "Resuming"
8083
AppWrapperRunning AppWrapperPhase = "Running"
84+
AppWrapperResetting AppWrapperPhase = "Resetting"
8185
AppWrapperSuspending AppWrapperPhase = "Suspending"
8286
AppWrapperSucceeded AppWrapperPhase = "Succeeded"
8387
AppWrapperFailed AppWrapperPhase = "Failed"
@@ -90,6 +94,14 @@ const (
9094
QuotaReserved AppWrapperCondition = "QuotaReserved"
9195
ResourcesDeployed AppWrapperCondition = "ResourcesDeployed"
9296
PodsReady AppWrapperCondition = "PodsReady"
97+
Unhealthy AppWrapperCondition = "Unhealthy"
98+
)
99+
100+
const (
101+
WarmupGracePeriodDurationAnnotation = "workload.codeflare.dev.appwrapper/warmupGracePeriodDuration"
102+
FailureGracePeriodDurationAnnotation = "workload.codeflare.dev.appwrapper/failureGracePeriodDuration"
103+
ResetPauseDurationAnnotation = "workload.codeflare.dev.appwrapper/resetPauseDuration"
104+
RetryLimitAnnotation = "workload.codeflare.dev.appwrapper/retryLimit"
93105
)
94106

95107
//+kubebuilder:object:root=true

cmd/standalone/main.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ func main() {
6060
var secureMetrics bool
6161
var enableHTTP2 bool
6262

63-
awConfig := config.AppWrapperConfig{StandaloneMode: true, ManageJobsWithoutQueueName: false}
63+
awConfig := config.NewConfig()
64+
awConfig.StandaloneMode = true
65+
awConfig.ManageJobsWithoutQueueName = false
6466

6567
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
6668
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
@@ -130,7 +132,7 @@ func main() {
130132
}
131133

132134
ctx := ctrl.SetupSignalHandler()
133-
err = controller.SetupWithManager(ctx, mgr, &awConfig)
135+
err = controller.SetupWithManager(ctx, mgr, awConfig)
134136
if err != nil {
135137
setupLog.Error(err, "unable to start appwrapper controllers")
136138
os.Exit(1)

cmd/unified/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func main() {
6262
var secureMetrics bool
6363
var enableHTTP2 bool
6464

65-
awConfig := config.AppWrapperConfig{StandaloneMode: false}
65+
awConfig := config.NewConfig()
6666

6767
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
6868
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
@@ -133,7 +133,7 @@ func main() {
133133
}
134134

135135
ctx := ctrl.SetupSignalHandler()
136-
err = controller.SetupWithManager(ctx, mgr, &awConfig)
136+
err = controller.SetupWithManager(ctx, mgr, awConfig)
137137
if err != nil {
138138
setupLog.Error(err, "unable to start appwrapper controllers")
139139
os.Exit(1)

config/crd/bases/workload.codeflare.dev_appwrappers.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,11 @@ spec:
212212
phase:
213213
description: Phase of the AppWrapper object
214214
type: string
215+
resettingCount:
216+
description: Retries counts the number of times the AppWrapper has
217+
entered the Resetting Phase
218+
format: int32
219+
type: integer
215220
type: object
216221
type: object
217222
served: true

docs/state-diagram.md

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ stateDiagram-v2
2323
sd : Suspended
2424
rs : Resuming
2525
rn : Running
26+
rt : Resetting
2627
sg : Suspending
2728
s : Succeeded
2829
f : Failed
@@ -36,15 +37,19 @@ stateDiagram-v2
3637
%% Requeuing
3738
rs --> sg : Suspend == true
3839
rn --> sg : Suspend == true
40+
rt --> sg : Suspend == true
3941
sg --> sd
4042
4143
%% Failures
4244
rs --> f
4345
rn --> f
46+
rn --> rt : Pod Failures
47+
rt --> rs
4448
4549
classDef quota fill:lightblue
4650
class rs quota
4751
class rn quota
52+
class rt quota
4853
class sg quota
4954
5055
classDef failed fill:pink

internal/controller/appwrapper/appwrapper_controller.go

+162-21
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package appwrapper
1919
import (
2020
"context"
2121
"fmt"
22+
"strconv"
2223
"time"
2324

2425
v1 "k8s.io/api/core/v1"
@@ -168,6 +169,18 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
168169
Reason: string(workloadv1beta2.AppWrapperResuming),
169170
Message: "Suspend is false",
170171
})
172+
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
173+
Type: string(workloadv1beta2.PodsReady),
174+
Status: metav1.ConditionFalse,
175+
Reason: string(workloadv1beta2.AppWrapperResuming),
176+
Message: "Suspend is false",
177+
})
178+
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
179+
Type: string(workloadv1beta2.Unhealthy),
180+
Status: metav1.ConditionFalse,
181+
Reason: string(workloadv1beta2.AppWrapperResuming),
182+
Message: "Suspend is false",
183+
})
171184
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperResuming)
172185

173186
case workloadv1beta2.AppWrapperResuming: // deploying components
@@ -176,16 +189,17 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
176189
}
177190
err, fatal := r.createComponents(ctx, aw)
178191
if err != nil {
192+
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
193+
Type: string(workloadv1beta2.Unhealthy),
194+
Status: metav1.ConditionTrue,
195+
Reason: "CreateFailed",
196+
Message: fmt.Sprintf("error creating components: %v", err),
197+
})
179198
if fatal {
180-
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
181-
Type: string(workloadv1beta2.PodsReady),
182-
Status: metav1.ConditionFalse,
183-
Reason: "CreateFailed",
184-
Message: fmt.Sprintf("fatal error creating components: %v", err),
185-
})
186199
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed) // abort on fatal error
200+
} else {
201+
return r.resetOrFail(ctx, aw)
187202
}
188-
return ctrl.Result{}, err // retry creation on transient error
189203
}
190204
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperRunning)
191205

@@ -197,6 +211,8 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
197211
if err != nil {
198212
return ctrl.Result{}, err
199213
}
214+
215+
// Handle Success
200216
if podStatus.succeeded >= podStatus.expected && (podStatus.pending+podStatus.running+podStatus.failed == 0) {
201217
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
202218
Type: string(workloadv1beta2.QuotaReserved),
@@ -206,16 +222,30 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
206222
})
207223
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSucceeded)
208224
}
225+
226+
// Handle Failed Pods
209227
if podStatus.failed > 0 {
210228
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
211-
Type: string(workloadv1beta2.PodsReady),
212-
Status: metav1.ConditionFalse,
213-
Reason: "PodsFailed",
214-
Message: fmt.Sprintf("%v pods failed (%v pods pending; %v pods running; %v pods succeeded)",
215-
podStatus.failed, podStatus.pending, podStatus.running, podStatus.succeeded),
229+
Type: string(workloadv1beta2.Unhealthy),
230+
Status: metav1.ConditionTrue,
231+
Reason: "FoundFailedPods",
232+
// Intentionally no detailed message with failed pod count, since changing the message resets the transition time
216233
})
217-
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed)
234+
235+
// Grace period to give the resource controller a chance to correct the failure
236+
whenDetected := meta.FindStatusCondition(aw.Status.Conditions, string(workloadv1beta2.Unhealthy)).LastTransitionTime
237+
gracePeriod := r.failureGraceDuration(ctx, aw)
238+
now := time.Now()
239+
deadline := whenDetected.Add(gracePeriod)
240+
if now.Before(deadline) {
241+
return ctrl.Result{RequeueAfter: deadline.Sub(now)}, r.Status().Update(ctx, aw)
242+
} else {
243+
return r.resetOrFail(ctx, aw)
244+
}
218245
}
246+
247+
clearCondition(aw, workloadv1beta2.Unhealthy, "FoundNoFailedPods", "")
248+
219249
if podStatus.running+podStatus.succeeded >= podStatus.expected {
220250
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
221251
Type: string(workloadv1beta2.PodsReady),
@@ -225,14 +255,23 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
225255
})
226256
return ctrl.Result{RequeueAfter: time.Minute}, r.Status().Update(ctx, aw)
227257
}
228-
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
229-
Type: string(workloadv1beta2.PodsReady),
230-
Status: metav1.ConditionFalse,
231-
Reason: "InsufficientPodsReady",
232-
Message: fmt.Sprintf("%v pods pending; %v pods running; %v pods succeeded",
233-
podStatus.pending, podStatus.running, podStatus.succeeded),
234-
})
235-
return ctrl.Result{RequeueAfter: 5 * time.Second}, r.Status().Update(ctx, aw)
258+
259+
// Not ready yet; either continue to wait or giveup if the warmup period has expired
260+
podDetailsMessage := fmt.Sprintf("%v pods pending; %v pods running; %v pods succeeded", podStatus.pending, podStatus.running, podStatus.succeeded)
261+
clearCondition(aw, workloadv1beta2.PodsReady, "InsufficientPodsReady", podDetailsMessage)
262+
whenDeployed := meta.FindStatusCondition(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)).LastTransitionTime
263+
warmupDuration := r.warmupGraceDuration(ctx, aw)
264+
if time.Now().Before(whenDeployed.Add(warmupDuration)) {
265+
return ctrl.Result{RequeueAfter: 5 * time.Second}, r.Status().Update(ctx, aw)
266+
} else {
267+
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
268+
Type: string(workloadv1beta2.Unhealthy),
269+
Status: metav1.ConditionTrue,
270+
Reason: "InsufficientPodsReady",
271+
Message: podDetailsMessage,
272+
})
273+
return r.resetOrFail(ctx, aw)
274+
}
236275

237276
case workloadv1beta2.AppWrapperSuspending: // undeploying components
238277
// finish undeploying components irrespective of desired state (suspend bit)
@@ -253,8 +292,45 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
253292
Reason: string(workloadv1beta2.AppWrapperSuspended),
254293
Message: "Suspend is true",
255294
})
295+
clearCondition(aw, workloadv1beta2.PodsReady, string(workloadv1beta2.AppWrapperSuspended), "")
296+
clearCondition(aw, workloadv1beta2.Unhealthy, string(workloadv1beta2.AppWrapperSuspended), "")
256297
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspended)
257298

299+
case workloadv1beta2.AppWrapperResetting:
300+
if aw.Spec.Suspend {
301+
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspending) // Suspending trumps Resetting
302+
}
303+
304+
clearCondition(aw, workloadv1beta2.PodsReady, string(workloadv1beta2.AppWrapperResetting), "")
305+
if meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)) {
306+
if !r.deleteComponents(ctx, aw) {
307+
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
308+
}
309+
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
310+
Type: string(workloadv1beta2.ResourcesDeployed),
311+
Status: metav1.ConditionFalse,
312+
Reason: string(workloadv1beta2.AppWrapperResetting),
313+
Message: "Resources deleted for resetting AppWrapper",
314+
})
315+
}
316+
317+
// Pause before transitioning to Resuming to heuristically allow transient system problems to subside
318+
whenReset := meta.FindStatusCondition(aw.Status.Conditions, string(workloadv1beta2.Unhealthy)).LastTransitionTime
319+
pauseDuration := r.resettingPauseDuration(ctx, aw)
320+
now := time.Now()
321+
deadline := whenReset.Add(pauseDuration)
322+
if now.Before(deadline) {
323+
return ctrl.Result{RequeueAfter: deadline.Sub(now)}, r.Status().Update(ctx, aw)
324+
}
325+
326+
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
327+
Type: string(workloadv1beta2.ResourcesDeployed),
328+
Status: metav1.ConditionTrue,
329+
Reason: string(workloadv1beta2.AppWrapperResuming),
330+
Message: "Reset complete; resuming",
331+
})
332+
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperResuming)
333+
258334
case workloadv1beta2.AppWrapperFailed:
259335
if meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)) {
260336
if !r.deleteComponents(ctx, aw) {
@@ -449,6 +525,16 @@ func (r *AppWrapperReconciler) updateStatus(ctx context.Context, aw *workloadv1b
449525
return ctrl.Result{}, nil
450526
}
451527

528+
func (r *AppWrapperReconciler) resetOrFail(ctx context.Context, aw *workloadv1beta2.AppWrapper) (ctrl.Result, error) {
529+
maxRetries := r.retryLimit(ctx, aw)
530+
if aw.Status.Retries < maxRetries {
531+
aw.Status.Retries += 1
532+
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperResetting)
533+
} else {
534+
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed)
535+
}
536+
}
537+
452538
func (r *AppWrapperReconciler) workloadStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper) (*podStatusSummary, error) {
453539
pods := &v1.PodList{}
454540
if err := r.List(ctx, pods,
@@ -474,6 +560,61 @@ func (r *AppWrapperReconciler) workloadStatus(ctx context.Context, aw *workloadv
474560
return summary, nil
475561
}
476562

563+
func (r *AppWrapperReconciler) warmupGraceDuration(ctx context.Context, aw *workloadv1beta2.AppWrapper) time.Duration {
564+
if userPeriod, ok := aw.Annotations[workloadv1beta2.WarmupGracePeriodDurationAnnotation]; ok {
565+
if duration, err := time.ParseDuration(userPeriod); err == nil {
566+
return duration
567+
} else {
568+
log.FromContext(ctx).Info("Malformed warmup period annotation", "annotation", userPeriod, "error", err)
569+
}
570+
}
571+
return r.Config.WarmupGracePeriod
572+
}
573+
574+
func (r *AppWrapperReconciler) failureGraceDuration(ctx context.Context, aw *workloadv1beta2.AppWrapper) time.Duration {
575+
if userPeriod, ok := aw.Annotations[workloadv1beta2.FailureGracePeriodDurationAnnotation]; ok {
576+
if duration, err := time.ParseDuration(userPeriod); err == nil {
577+
return duration
578+
} else {
579+
log.FromContext(ctx).Info("Malformed grace period annotation", "annotation", userPeriod, "error", err)
580+
}
581+
}
582+
return r.Config.FailureGracePeriod
583+
}
584+
585+
func (r *AppWrapperReconciler) retryLimit(ctx context.Context, aw *workloadv1beta2.AppWrapper) int32 {
586+
if userLimit, ok := aw.Annotations[workloadv1beta2.RetryLimitAnnotation]; ok {
587+
if limit, err := strconv.Atoi(userLimit); err == nil {
588+
return int32(limit)
589+
} else {
590+
log.FromContext(ctx).Info("Malformed retry limit annotation", "annotation", userLimit, "error", err)
591+
}
592+
}
593+
return r.Config.RetryLimit
594+
}
595+
596+
func (r *AppWrapperReconciler) resettingPauseDuration(ctx context.Context, aw *workloadv1beta2.AppWrapper) time.Duration {
597+
if userPeriod, ok := aw.Annotations[workloadv1beta2.ResetPauseDurationAnnotation]; ok {
598+
if duration, err := time.ParseDuration(userPeriod); err == nil {
599+
return duration
600+
} else {
601+
log.FromContext(ctx).Info("Malformed reset pause annotation", "annotation", userPeriod, "error", err)
602+
}
603+
}
604+
return r.Config.ResetPause
605+
}
606+
607+
func clearCondition(aw *workloadv1beta2.AppWrapper, condition workloadv1beta2.AppWrapperCondition, reason string, message string) {
608+
if meta.IsStatusConditionTrue(aw.Status.Conditions, string(condition)) {
609+
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
610+
Type: string(condition),
611+
Status: metav1.ConditionFalse,
612+
Reason: reason,
613+
Message: message,
614+
})
615+
}
616+
}
617+
477618
// SetupWithManager sets up the controller with the Manager.
478619
func (r *AppWrapperReconciler) SetupWithManager(mgr ctrl.Manager) error {
479620
return ctrl.NewControllerManagedBy(mgr).

internal/controller/appwrapper/appwrapper_controller_test.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package appwrapper
1818

1919
import (
20+
"time"
21+
2022
. "github.com/onsi/ginkgo/v2"
2123
. "github.com/onsi/gomega"
2224
v1 "k8s.io/api/core/v1"
@@ -54,10 +56,14 @@ var _ = Describe("AppWrapper Controller", func() {
5456
Name: aw.Name,
5557
Namespace: aw.Namespace,
5658
}
59+
awConfig := config.NewConfig()
60+
awConfig.FailureGracePeriod = 0 * time.Second
61+
awConfig.ResetPause = 0 * time.Second
62+
awConfig.RetryLimit = 0
5763
awReconciler = &AppWrapperReconciler{
5864
Client: k8sClient,
5965
Scheme: k8sClient.Scheme(),
60-
Config: &config.AppWrapperConfig{ManageJobsWithoutQueueName: true, StandaloneMode: false},
66+
Config: awConfig,
6167
}
6268
kueuePodSets = (*workload.AppWrapper)(aw).PodSets()
6369

0 commit comments

Comments
 (0)