Skip to content

Commit 47e15a3

Browse files
authored
Propagate admission to child workload objects (#46)
If an AppWrapper contains a component that has its own Kueue-enabled controller, propagate the admission status of the AppWrapper to the component's Workload object.
1 parent 9a9ec5f commit 47e15a3

File tree

7 files changed

+110
-51
lines changed

7 files changed

+110
-51
lines changed

internal/controller/appwrapper_config.go

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"os"
2323

2424
ctrl "sigs.k8s.io/controller-runtime"
25+
2526
"sigs.k8s.io/kueue/pkg/controller/jobframework"
2627
)
2728

internal/controller/appwrapper_controller.go

+67-27
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,28 @@ import (
2828
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2929
"k8s.io/apimachinery/pkg/runtime"
3030
"k8s.io/apimachinery/pkg/types"
31+
3132
ctrl "sigs.k8s.io/controller-runtime"
3233
"sigs.k8s.io/controller-runtime/pkg/client"
3334
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3435
"sigs.k8s.io/controller-runtime/pkg/handler"
3536
"sigs.k8s.io/controller-runtime/pkg/log"
3637
"sigs.k8s.io/controller-runtime/pkg/reconcile"
38+
39+
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
40+
"sigs.k8s.io/kueue/pkg/controller/constants"
41+
"sigs.k8s.io/kueue/pkg/controller/jobframework"
3742
"sigs.k8s.io/kueue/pkg/podset"
3843
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
44+
"sigs.k8s.io/kueue/pkg/workload"
3945

4046
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
4147
)
4248

4349
const (
4450
AppWrapperLabel = "workload.codeflare.dev/appwrapper"
4551
appWrapperFinalizer = "workload.codeflare.dev/finalizer"
52+
childJobQueueName = "workload.codeflare.dev.admitted"
4653
)
4754

4855
// AppWrapperReconciler reconciles an appwrapper
@@ -212,6 +219,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
212219
})
213220
return ctrl.Result{RequeueAfter: time.Minute}, r.Status().Update(ctx, aw)
214221
}
222+
r.propagateAdmission(ctx, aw)
215223
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
216224
Type: string(workloadv1beta2.PodsReady),
217225
Status: metav1.ConditionFalse,
@@ -266,14 +274,6 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
266274
return ctrl.Result{}, nil
267275
}
268276

269-
// SetupWithManager sets up the controller with the Manager.
270-
func (r *AppWrapperReconciler) SetupWithManager(mgr ctrl.Manager) error {
271-
return ctrl.NewControllerManagedBy(mgr).
272-
For(&workloadv1beta2.AppWrapper{}).
273-
Watches(&v1.Pod{}, handler.EnqueueRequestsFromMapFunc(r.podMapFunc)).
274-
Complete(r)
275-
}
276-
277277
// podMapFunc maps pods to appwrappers
278278
func (r *AppWrapperReconciler) podMapFunc(ctx context.Context, obj client.Object) []reconcile.Request {
279279
pod := obj.(*v1.Pod)
@@ -299,7 +299,8 @@ func parseComponent(aw *workloadv1beta2.AppWrapper, raw []byte) (*unstructured.U
299299
return obj, nil
300300
}
301301

302-
func materializeObject(aw *workloadv1beta2.AppWrapper, component *workloadv1beta2.AppWrapperComponent) (client.Object, error) {
302+
func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workloadv1beta2.AppWrapper, componentIdx int) (*unstructured.Unstructured, error, bool) {
303+
component := aw.Spec.Components[componentIdx]
303304
toMap := func(x interface{}) map[string]string {
304305
if x == nil {
305306
return nil
@@ -322,19 +323,20 @@ func materializeObject(aw *workloadv1beta2.AppWrapper, component *workloadv1beta
322323
}
323324
}
324325
}
325-
awLabels := map[string]string{AppWrapperLabel: aw.Name}
326326

327327
obj, err := parseComponent(aw, component.Template.Raw)
328328
if err != nil {
329-
return nil, err
329+
return nil, err, true
330330
}
331+
obj.SetLabels(utilmaps.MergeKeepFirst(obj.GetLabels(), map[string]string{AppWrapperLabel: aw.Name, constants.QueueLabel: childJobQueueName}))
331332

333+
awLabels := map[string]string{AppWrapperLabel: aw.Name}
332334
for podSetsIdx, podSet := range component.PodSets {
333335
toInject := component.PodSetInfos[podSetsIdx]
334336

335337
p, err := getRawTemplate(obj.UnstructuredContent(), podSet.Path)
336338
if err != nil {
337-
return nil, err // Should not happen, path validity is enforced by validateAppWrapperInvariants
339+
return nil, err, true // Should not happen, path validity is enforced by validateAppWrapperInvariants
338340
}
339341
if md, ok := p["metadata"]; !ok || md == nil {
340342
p["metadata"] = make(map[string]interface{})
@@ -346,7 +348,7 @@ func materializeObject(aw *workloadv1beta2.AppWrapper, component *workloadv1beta
346348
if len(toInject.Annotations) > 0 {
347349
existing := toMap(metadata["annotations"])
348350
if err := utilmaps.HaveConflict(existing, toInject.Annotations); err != nil {
349-
return nil, podset.BadPodSetsUpdateError("annotations", err)
351+
return nil, podset.BadPodSetsUpdateError("annotations", err), true
350352
}
351353
metadata["annotations"] = utilmaps.MergeKeepFirst(existing, toInject.Annotations)
352354
}
@@ -355,15 +357,15 @@ func materializeObject(aw *workloadv1beta2.AppWrapper, component *workloadv1beta
355357
mergedLabels := utilmaps.MergeKeepFirst(toInject.Labels, awLabels)
356358
existing := toMap(metadata["labels"])
357359
if err := utilmaps.HaveConflict(existing, mergedLabels); err != nil {
358-
return nil, podset.BadPodSetsUpdateError("labels", err)
360+
return nil, podset.BadPodSetsUpdateError("labels", err), true
359361
}
360362
metadata["labels"] = utilmaps.MergeKeepFirst(existing, mergedLabels)
361363

362364
// NodeSelectors
363365
if len(toInject.NodeSelector) > 0 {
364366
existing := toMap(metadata["nodeSelector"])
365367
if err := utilmaps.HaveConflict(existing, toInject.NodeSelector); err != nil {
366-
return nil, podset.BadPodSetsUpdateError("nodeSelector", err)
368+
return nil, podset.BadPodSetsUpdateError("nodeSelector", err), true
367369
}
368370
metadata["nodeSelector"] = utilmaps.MergeKeepFirst(existing, toInject.NodeSelector)
369371
}
@@ -381,27 +383,57 @@ func materializeObject(aw *workloadv1beta2.AppWrapper, component *workloadv1beta
381383
}
382384
}
383385

384-
return obj, nil
386+
if err := controllerutil.SetControllerReference(aw, obj, r.Scheme); err != nil {
387+
return nil, err, true
388+
}
389+
390+
if err := r.Create(ctx, obj); err != nil {
391+
if !apierrors.IsAlreadyExists(err) {
392+
return nil, err, meta.IsNoMatchError(err) || apierrors.IsInvalid(err) // fatal
393+
}
394+
}
395+
396+
return obj, nil, false
385397
}
386398

387399
func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) (error, bool) {
388-
for _, component := range aw.Spec.Components {
389-
obj, err := materializeObject(aw, &component)
400+
for componentIdx := range aw.Spec.Components {
401+
_, err, fatal := r.createComponent(ctx, aw, componentIdx)
390402
if err != nil {
391-
return err, true
403+
return err, fatal
392404
}
405+
}
406+
return nil, false
407+
}
393408

394-
if err := controllerutil.SetControllerReference(aw, obj, r.Scheme); err != nil {
395-
return err, true
396-
}
397-
if err := r.Create(ctx, obj); err != nil {
398-
if apierrors.IsAlreadyExists(err) {
399-
continue // ignore existing component
409+
func (r *AppWrapperReconciler) propagateAdmission(ctx context.Context, aw *workloadv1beta2.AppWrapper) {
410+
for componentIdx, component := range aw.Spec.Components {
411+
if len(component.PodSets) > 0 {
412+
obj, err := parseComponent(aw, component.Template.Raw)
413+
if err != nil {
414+
return
415+
}
416+
wlName := jobframework.GetWorkloadNameForOwnerWithGVK(obj.GetName(), obj.GroupVersionKind())
417+
wl := &kueue.Workload{}
418+
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: aw.Namespace, Name: wlName}, wl); err == nil {
419+
if !workload.IsAdmitted(wl) {
420+
admission := kueue.Admission{
421+
ClusterQueue: childJobQueueName,
422+
PodSetAssignments: make([]kueue.PodSetAssignment, len(aw.Spec.Components[componentIdx].PodSets)),
423+
}
424+
for i := range admission.PodSetAssignments {
425+
admission.PodSetAssignments[i].Name = wl.Spec.PodSets[i].Name
426+
}
427+
newWorkload := wl.DeepCopy()
428+
workload.SetQuotaReservation(newWorkload, &admission)
429+
_ = workload.SyncAdmittedCondition(newWorkload)
430+
if err = workload.ApplyAdmissionStatus(ctx, r.Client, newWorkload, false); err != nil {
431+
log.FromContext(ctx).Error(err, "syncing admission", "appwrapper", aw, "componentIdx", componentIdx, "workload", wl, "newworkload", newWorkload)
432+
}
433+
}
400434
}
401-
return err, meta.IsNoMatchError(err) || apierrors.IsInvalid(err) // fatal
402435
}
403436
}
404-
return nil, false
405437
}
406438

407439
func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) bool {
@@ -476,3 +508,11 @@ func ExpectedPodCount(aw *workloadv1beta2.AppWrapper) int32 {
476508
}
477509
return expected
478510
}
511+
512+
// SetupWithManager sets up the controller with the Manager.
513+
func (r *AppWrapperReconciler) SetupWithManager(mgr ctrl.Manager) error {
514+
return ctrl.NewControllerManagedBy(mgr).
515+
For(&workloadv1beta2.AppWrapper{}).
516+
Watches(&v1.Pod{}, handler.EnqueueRequestsFromMapFunc(r.podMapFunc)).
517+
Complete(r)
518+
}

internal/controller/appwrapper_webhook.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"context"
2222
"fmt"
2323

24-
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
2524
authv1 "k8s.io/api/authorization/v1"
2625
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2726
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -31,11 +30,15 @@ import (
3130
discovery "k8s.io/client-go/discovery"
3231
"k8s.io/client-go/kubernetes"
3332
authClientv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
33+
3434
ctrl "sigs.k8s.io/controller-runtime"
3535
"sigs.k8s.io/controller-runtime/pkg/log"
3636
"sigs.k8s.io/controller-runtime/pkg/webhook"
3737
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
38+
3839
"sigs.k8s.io/kueue/pkg/controller/jobframework"
40+
41+
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
3942
)
4043

4144
type AppWrapperWebhook struct {

internal/controller/suite_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"sigs.k8s.io/controller-runtime/pkg/webhook"
4747

4848
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
49+
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
4950
)
5051

5152
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
@@ -104,6 +105,8 @@ var _ = BeforeSuite(func() {
104105
Expect(err).NotTo(HaveOccurred())
105106
err = clientgoscheme.AddToScheme(scheme)
106107
Expect(err).NotTo(HaveOccurred())
108+
err = kueue.AddToScheme(scheme)
109+
Expect(err).NotTo(HaveOccurred())
107110

108111
//+kubebuilder:scaffold:scheme
109112

internal/controller/utils.go

+23
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import (
2323
v1 "k8s.io/api/core/v1"
2424
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2525
"k8s.io/apimachinery/pkg/runtime"
26+
27+
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
28+
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
2629
)
2730

2831
// getPodTemplateSpec extracts a Kueue-compatible PodTemplateSpec at the given path within obj
@@ -56,6 +59,26 @@ func getPodTemplateSpec(obj *unstructured.Unstructured, path string) (*v1.PodTem
5659
return template, nil
5760
}
5861

62+
func getKueuePodSets(obj *unstructured.Unstructured, component *workloadv1beta2.AppWrapperComponent, awName string, componentIdx int) ([]kueue.PodSet, error) {
63+
podSets := []kueue.PodSet{}
64+
for psIdx, podSet := range component.PodSets {
65+
replicas := int32(1)
66+
if podSet.Replicas != nil {
67+
replicas = *podSet.Replicas
68+
}
69+
template, err := getPodTemplateSpec(obj, podSet.Path)
70+
if err != nil {
71+
return nil, err
72+
}
73+
podSets = append(podSets, kueue.PodSet{
74+
Name: fmt.Sprintf("%s-%v-%v", awName, componentIdx, psIdx),
75+
Template: *template,
76+
Count: replicas,
77+
})
78+
}
79+
return podSets, nil
80+
}
81+
5982
// return the subobject found at the given path, or nil if the path is invalid
6083
func getRawTemplate(obj map[string]interface{}, path string) (map[string]interface{}, error) {
6184
parts := strings.Split(path, ".")

internal/controller/workload_controller.go

+11-22
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ limitations under the License.
1717
package controller
1818

1919
import (
20-
"fmt"
21-
2220
"k8s.io/apimachinery/pkg/api/meta"
2321
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2422
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2523
"k8s.io/apimachinery/pkg/runtime/schema"
24+
2625
"sigs.k8s.io/controller-runtime/pkg/client"
26+
2727
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
2828
"sigs.k8s.io/kueue/pkg/controller/jobframework"
2929
"sigs.k8s.io/kueue/pkg/podset"
@@ -68,26 +68,15 @@ func (aw *AppWrapper) GVK() schema.GroupVersionKind {
6868

6969
func (aw *AppWrapper) PodSets() []kueue.PodSet {
7070
podSets := []kueue.PodSet{}
71-
i := 0
72-
for _, component := range aw.Spec.Components {
73-
obj := &unstructured.Unstructured{}
74-
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(component.Template.Raw, nil, obj); err != nil {
75-
continue
76-
}
77-
78-
for _, podSet := range component.PodSets {
79-
replicas := int32(1)
80-
if podSet.Replicas != nil {
81-
replicas = *podSet.Replicas
71+
for componentIdx, component := range aw.Spec.Components {
72+
if len(component.PodSets) > 0 {
73+
obj := &unstructured.Unstructured{}
74+
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(component.Template.Raw, nil, obj); err != nil {
75+
continue // Should be unreachable; Template.Raw validated by our AdmissionController
8276
}
83-
template, err := getPodTemplateSpec(obj, podSet.Path)
77+
toAdd, err := getKueuePodSets(obj, &component, aw.Name, componentIdx)
8478
if err == nil {
85-
podSets = append(podSets, kueue.PodSet{
86-
Name: aw.Name + "-" + fmt.Sprint(i),
87-
Template: *template,
88-
Count: replicas,
89-
})
90-
i++
79+
podSets = append(podSets, toAdd...)
9180
}
9281
}
9382
}
@@ -99,13 +88,13 @@ func (aw *AppWrapper) RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error
9988
podSetsInfoIndex := 0
10089
for componentIdx := range aw.Spec.Components {
10190
component := &aw.Spec.Components[componentIdx]
102-
if len(component.PodSets) != len(component.PodSetInfos) {
91+
if len(component.PodSetInfos) != len(component.PodSets) {
10392
component.PodSetInfos = make([]workloadv1beta2.AppWrapperPodSetInfo, len(component.PodSets))
10493
}
10594
for podSetIdx := range component.PodSets {
10695
podSetsInfoIndex += 1
10796
if podSetsInfoIndex > len(podSetsInfo) {
108-
continue // we're going to return an error below...just continuing to get an accurate count for the error message
97+
continue // we will return an error below...continuing to get an accurate count for the error message
10998
}
11099
component.PodSetInfos[podSetIdx] = workloadv1beta2.AppWrapperPodSetInfo{
111100
Annotations: podSetsInfo[podSetIdx].Annotations,

samples/wrapped-job.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ spec:
2222
containers:
2323
- name: busybox
2424
image: quay.io/project-codeflare/busybox:1.36
25-
command: ["sh", "-c", "sleep 600"]
25+
command: ["sh", "-c", "sleep 30"]
2626
resources:
2727
requests:
2828
cpu: 1

0 commit comments

Comments
 (0)