Skip to content

Commit 0db706e

Browse files
authored
pull resource creation/deletion code to separate file (#70)
1 parent e1f43d6 commit 0db706e

File tree

2 files changed

+196
-167
lines changed

2 files changed

+196
-167
lines changed

internal/controller/appwrapper/appwrapper_controller.go

+11-167
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ import (
2323
"time"
2424

2525
v1 "k8s.io/api/core/v1"
26-
apierrors "k8s.io/apimachinery/pkg/api/errors"
2726
"k8s.io/apimachinery/pkg/api/meta"
2827
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3028
"k8s.io/apimachinery/pkg/runtime"
3129
"k8s.io/apimachinery/pkg/types"
3230

@@ -37,10 +35,6 @@ import (
3735
"sigs.k8s.io/controller-runtime/pkg/log"
3836
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3937

40-
"sigs.k8s.io/kueue/pkg/controller/constants"
41-
"sigs.k8s.io/kueue/pkg/podset"
42-
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
43-
4438
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
4539
"github.com/project-codeflare/appwrapper/pkg/config"
4640
"github.com/project-codeflare/appwrapper/pkg/utils"
@@ -355,167 +349,6 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
355349
return ctrl.Result{}, nil
356350
}
357351

358-
// podMapFunc maps pods to appwrappers
359-
func (r *AppWrapperReconciler) podMapFunc(ctx context.Context, obj client.Object) []reconcile.Request {
360-
pod := obj.(*v1.Pod)
361-
if name, ok := pod.Labels[AppWrapperLabel]; ok {
362-
if pod.Status.Phase == v1.PodSucceeded {
363-
return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: name}}}
364-
}
365-
}
366-
return nil
367-
}
368-
369-
func parseComponent(aw *workloadv1beta2.AppWrapper, raw []byte) (*unstructured.Unstructured, error) {
370-
obj := &unstructured.Unstructured{}
371-
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(raw, nil, obj); err != nil {
372-
return nil, err
373-
}
374-
namespace := obj.GetNamespace()
375-
if namespace == "" {
376-
obj.SetNamespace(aw.Namespace)
377-
} else if namespace != aw.Namespace {
378-
// Should not happen, namespace equality checked by validateAppWrapperInvariants
379-
return nil, fmt.Errorf("component namespace \"%s\" is different from appwrapper namespace \"%s\"", namespace, aw.Namespace)
380-
}
381-
return obj, nil
382-
}
383-
384-
func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workloadv1beta2.AppWrapper, componentIdx int) (*unstructured.Unstructured, error, bool) {
385-
component := aw.Spec.Components[componentIdx]
386-
toMap := func(x interface{}) map[string]string {
387-
if x == nil {
388-
return nil
389-
} else {
390-
if sm, ok := x.(map[string]string); ok {
391-
return sm
392-
} else if im, ok := x.(map[string]interface{}); ok {
393-
sm := make(map[string]string, len(im))
394-
for k, v := range im {
395-
str, ok := v.(string)
396-
if ok {
397-
sm[k] = str
398-
} else {
399-
sm[k] = fmt.Sprint(v)
400-
}
401-
}
402-
return sm
403-
} else {
404-
return nil
405-
}
406-
}
407-
}
408-
409-
obj, err := parseComponent(aw, component.Template.Raw)
410-
if err != nil {
411-
return nil, err, true
412-
}
413-
if r.Config.StandaloneMode {
414-
obj.SetLabels(utilmaps.MergeKeepFirst(obj.GetLabels(), map[string]string{AppWrapperLabel: aw.Name}))
415-
} else {
416-
obj.SetLabels(utilmaps.MergeKeepFirst(obj.GetLabels(), map[string]string{AppWrapperLabel: aw.Name, constants.QueueLabel: childJobQueueName}))
417-
}
418-
419-
awLabels := map[string]string{AppWrapperLabel: aw.Name}
420-
for podSetsIdx, podSet := range component.PodSets {
421-
toInject := &workloadv1beta2.AppWrapperPodSetInfo{}
422-
if !r.Config.StandaloneMode {
423-
toInject = &component.PodSetInfos[podSetsIdx]
424-
}
425-
426-
p, err := utils.GetRawTemplate(obj.UnstructuredContent(), podSet.Path)
427-
if err != nil {
428-
return nil, err, true // Should not happen, path validity is enforced by validateAppWrapperInvariants
429-
}
430-
if md, ok := p["metadata"]; !ok || md == nil {
431-
p["metadata"] = make(map[string]interface{})
432-
}
433-
metadata := p["metadata"].(map[string]interface{})
434-
spec := p["spec"].(map[string]interface{}) // Must exist, enforced by validateAppWrapperInvariants
435-
436-
// Annotations
437-
if len(toInject.Annotations) > 0 {
438-
existing := toMap(metadata["annotations"])
439-
if err := utilmaps.HaveConflict(existing, toInject.Annotations); err != nil {
440-
return nil, podset.BadPodSetsUpdateError("annotations", err), true
441-
}
442-
metadata["annotations"] = utilmaps.MergeKeepFirst(existing, toInject.Annotations)
443-
}
444-
445-
// Labels
446-
mergedLabels := utilmaps.MergeKeepFirst(toInject.Labels, awLabels)
447-
existing := toMap(metadata["labels"])
448-
if err := utilmaps.HaveConflict(existing, mergedLabels); err != nil {
449-
return nil, podset.BadPodSetsUpdateError("labels", err), true
450-
}
451-
metadata["labels"] = utilmaps.MergeKeepFirst(existing, mergedLabels)
452-
453-
// NodeSelectors
454-
if len(toInject.NodeSelector) > 0 {
455-
existing := toMap(metadata["nodeSelector"])
456-
if err := utilmaps.HaveConflict(existing, toInject.NodeSelector); err != nil {
457-
return nil, podset.BadPodSetsUpdateError("nodeSelector", err), true
458-
}
459-
metadata["nodeSelector"] = utilmaps.MergeKeepFirst(existing, toInject.NodeSelector)
460-
}
461-
462-
// Tolerations
463-
if len(toInject.Tolerations) > 0 {
464-
if _, ok := spec["tolerations"]; !ok {
465-
spec["tolerations"] = []interface{}{}
466-
}
467-
tolerations := spec["tolerations"].([]interface{})
468-
for _, addition := range toInject.Tolerations {
469-
tolerations = append(tolerations, addition)
470-
}
471-
spec["tolerations"] = tolerations
472-
}
473-
}
474-
475-
if err := controllerutil.SetControllerReference(aw, obj, r.Scheme); err != nil {
476-
return nil, err, true
477-
}
478-
479-
if err := r.Create(ctx, obj); err != nil {
480-
if !apierrors.IsAlreadyExists(err) {
481-
return nil, err, meta.IsNoMatchError(err) || apierrors.IsInvalid(err) // fatal
482-
}
483-
}
484-
485-
return obj, nil, false
486-
}
487-
488-
func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) (error, bool) {
489-
for componentIdx := range aw.Spec.Components {
490-
_, err, fatal := r.createComponent(ctx, aw, componentIdx)
491-
if err != nil {
492-
return err, fatal
493-
}
494-
}
495-
return nil, false
496-
}
497-
498-
func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) bool {
499-
// TODO forceful deletion: See https://github.com/project-codeflare/appwrapper/issues/36
500-
log := log.FromContext(ctx)
501-
remaining := 0
502-
for _, component := range aw.Spec.Components {
503-
obj, err := parseComponent(aw, component.Template.Raw)
504-
if err != nil {
505-
log.Error(err, "Parsing error")
506-
continue
507-
}
508-
if err := r.Delete(ctx, obj, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
509-
if !apierrors.IsNotFound(err) {
510-
log.Error(err, "Deletion error")
511-
}
512-
continue
513-
}
514-
remaining++ // no error deleting resource, resource therefore still exists
515-
}
516-
return remaining == 0
517-
}
518-
519352
func (r *AppWrapperReconciler) updateStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper, phase workloadv1beta2.AppWrapperPhase) (ctrl.Result, error) {
520353
aw.Status.Phase = phase
521354
if err := r.Status().Update(ctx, aw); err != nil {
@@ -615,6 +448,17 @@ func clearCondition(aw *workloadv1beta2.AppWrapper, condition workloadv1beta2.Ap
615448
}
616449
}
617450

451+
// podMapFunc maps pods to appwrappers
452+
func (r *AppWrapperReconciler) podMapFunc(ctx context.Context, obj client.Object) []reconcile.Request {
453+
pod := obj.(*v1.Pod)
454+
if name, ok := pod.Labels[AppWrapperLabel]; ok {
455+
if pod.Status.Phase == v1.PodSucceeded {
456+
return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: pod.Namespace, Name: name}}}
457+
}
458+
}
459+
return nil
460+
}
461+
618462
// SetupWithManager sets up the controller with the Manager.
619463
func (r *AppWrapperReconciler) SetupWithManager(mgr ctrl.Manager) error {
620464
return ctrl.NewControllerManagedBy(mgr).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
Copyright 2024 IBM Corporation.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package appwrapper
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
24+
"github.com/project-codeflare/appwrapper/pkg/utils"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/api/meta"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29+
"sigs.k8s.io/controller-runtime/pkg/client"
30+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
31+
"sigs.k8s.io/controller-runtime/pkg/log"
32+
"sigs.k8s.io/kueue/pkg/controller/constants"
33+
"sigs.k8s.io/kueue/pkg/podset"
34+
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
35+
)
36+
37+
func parseComponent(aw *workloadv1beta2.AppWrapper, raw []byte) (*unstructured.Unstructured, error) {
38+
obj := &unstructured.Unstructured{}
39+
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(raw, nil, obj); err != nil {
40+
return nil, err
41+
}
42+
namespace := obj.GetNamespace()
43+
if namespace == "" {
44+
obj.SetNamespace(aw.Namespace)
45+
} else if namespace != aw.Namespace {
46+
// Should not happen, namespace equality checked by validateAppWrapperInvariants
47+
return nil, fmt.Errorf("component namespace \"%s\" is different from appwrapper namespace \"%s\"", namespace, aw.Namespace)
48+
}
49+
return obj, nil
50+
}
51+
52+
func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workloadv1beta2.AppWrapper, componentIdx int) (*unstructured.Unstructured, error, bool) {
53+
component := aw.Spec.Components[componentIdx]
54+
toMap := func(x interface{}) map[string]string {
55+
if x == nil {
56+
return nil
57+
} else {
58+
if sm, ok := x.(map[string]string); ok {
59+
return sm
60+
} else if im, ok := x.(map[string]interface{}); ok {
61+
sm := make(map[string]string, len(im))
62+
for k, v := range im {
63+
str, ok := v.(string)
64+
if ok {
65+
sm[k] = str
66+
} else {
67+
sm[k] = fmt.Sprint(v)
68+
}
69+
}
70+
return sm
71+
} else {
72+
return nil
73+
}
74+
}
75+
}
76+
77+
obj, err := parseComponent(aw, component.Template.Raw)
78+
if err != nil {
79+
return nil, err, true
80+
}
81+
if r.Config.StandaloneMode {
82+
obj.SetLabels(utilmaps.MergeKeepFirst(obj.GetLabels(), map[string]string{AppWrapperLabel: aw.Name}))
83+
} else {
84+
obj.SetLabels(utilmaps.MergeKeepFirst(obj.GetLabels(), map[string]string{AppWrapperLabel: aw.Name, constants.QueueLabel: childJobQueueName}))
85+
}
86+
87+
awLabels := map[string]string{AppWrapperLabel: aw.Name}
88+
for podSetsIdx, podSet := range component.PodSets {
89+
toInject := &workloadv1beta2.AppWrapperPodSetInfo{}
90+
if !r.Config.StandaloneMode {
91+
toInject = &component.PodSetInfos[podSetsIdx]
92+
}
93+
94+
p, err := utils.GetRawTemplate(obj.UnstructuredContent(), podSet.Path)
95+
if err != nil {
96+
return nil, err, true // Should not happen, path validity is enforced by validateAppWrapperInvariants
97+
}
98+
if md, ok := p["metadata"]; !ok || md == nil {
99+
p["metadata"] = make(map[string]interface{})
100+
}
101+
metadata := p["metadata"].(map[string]interface{})
102+
spec := p["spec"].(map[string]interface{}) // Must exist, enforced by validateAppWrapperInvariants
103+
104+
// Annotations
105+
if len(toInject.Annotations) > 0 {
106+
existing := toMap(metadata["annotations"])
107+
if err := utilmaps.HaveConflict(existing, toInject.Annotations); err != nil {
108+
return nil, podset.BadPodSetsUpdateError("annotations", err), true
109+
}
110+
metadata["annotations"] = utilmaps.MergeKeepFirst(existing, toInject.Annotations)
111+
}
112+
113+
// Labels
114+
mergedLabels := utilmaps.MergeKeepFirst(toInject.Labels, awLabels)
115+
existing := toMap(metadata["labels"])
116+
if err := utilmaps.HaveConflict(existing, mergedLabels); err != nil {
117+
return nil, podset.BadPodSetsUpdateError("labels", err), true
118+
}
119+
metadata["labels"] = utilmaps.MergeKeepFirst(existing, mergedLabels)
120+
121+
// NodeSelectors
122+
if len(toInject.NodeSelector) > 0 {
123+
existing := toMap(metadata["nodeSelector"])
124+
if err := utilmaps.HaveConflict(existing, toInject.NodeSelector); err != nil {
125+
return nil, podset.BadPodSetsUpdateError("nodeSelector", err), true
126+
}
127+
metadata["nodeSelector"] = utilmaps.MergeKeepFirst(existing, toInject.NodeSelector)
128+
}
129+
130+
// Tolerations
131+
if len(toInject.Tolerations) > 0 {
132+
if _, ok := spec["tolerations"]; !ok {
133+
spec["tolerations"] = []interface{}{}
134+
}
135+
tolerations := spec["tolerations"].([]interface{})
136+
for _, addition := range toInject.Tolerations {
137+
tolerations = append(tolerations, addition)
138+
}
139+
spec["tolerations"] = tolerations
140+
}
141+
}
142+
143+
if err := controllerutil.SetControllerReference(aw, obj, r.Scheme); err != nil {
144+
return nil, err, true
145+
}
146+
147+
if err := r.Create(ctx, obj); err != nil {
148+
if !apierrors.IsAlreadyExists(err) {
149+
return nil, err, meta.IsNoMatchError(err) || apierrors.IsInvalid(err) // fatal
150+
}
151+
}
152+
153+
return obj, nil, false
154+
}
155+
156+
func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) (error, bool) {
157+
for componentIdx := range aw.Spec.Components {
158+
_, err, fatal := r.createComponent(ctx, aw, componentIdx)
159+
if err != nil {
160+
return err, fatal
161+
}
162+
}
163+
return nil, false
164+
}
165+
166+
func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) bool {
167+
// TODO forceful deletion: See https://github.com/project-codeflare/appwrapper/issues/36
168+
log := log.FromContext(ctx)
169+
remaining := 0
170+
for _, component := range aw.Spec.Components {
171+
obj, err := parseComponent(aw, component.Template.Raw)
172+
if err != nil {
173+
log.Error(err, "Parsing error")
174+
continue
175+
}
176+
if err := r.Delete(ctx, obj, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
177+
if !apierrors.IsNotFound(err) {
178+
log.Error(err, "Deletion error")
179+
}
180+
continue
181+
}
182+
remaining++ // no error deleting resource, resource therefore still exists
183+
}
184+
return remaining == 0
185+
}

0 commit comments

Comments
 (0)