diff --git a/pkg/controller/admissionchecks/provisioning/controller.go b/pkg/controller/admissionchecks/provisioning/controller.go index 083f5cd191..fc32a7efd4 100644 --- a/pkg/controller/admissionchecks/provisioning/controller.go +++ b/pkg/controller/admissionchecks/provisioning/controller.go @@ -38,6 +38,7 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -241,17 +242,17 @@ func (c *Controller) syncOwnedProvisionRequest( continue } - oldPr, exists := activeOrLastPRForChecks[checkName] + req, exists := activeOrLastPRForChecks[checkName] attempt := int32(1) shouldCreatePr := false if exists { - if (isFailed(oldPr) || (isBookingExpired(oldPr) && !workload.IsAdmitted(wl))) && + if (isFailed(req) || (isBookingExpired(req) && !workload.IsAdmitted(wl))) && // if the workload is Admitted we don't want to retry on BookingExpired ac != nil && ac.State == kueue.CheckStatePending { // if the workload is in Retry/Rejected state we don't create another ProvReq - attempt = getAttempt(log, oldPr, wl.Name, checkName) + attempt = getAttempt(log, req, wl.Name, checkName) if features.Enabled(features.KeepQuotaForProvReqRetry) { - remainingTime := c.remainingTimeToRetry(oldPr, attempt, prc) + remainingTime := c.remainingTimeToRetry(req, attempt, prc) if remainingTime <= 0 { shouldCreatePr = true attempt++ @@ -269,7 +270,7 @@ func (c *Controller) syncOwnedProvisionRequest( requestName := ProvisioningRequestName(wl.Name, checkName, attempt) if shouldCreatePr { log.V(3).Info("Creating ProvisioningRequest", "requestName", requestName, "attempt", attempt) - req := &autoscaling.ProvisioningRequest{ + req = &autoscaling.ProvisioningRequest{ ObjectMeta: metav1.ObjectMeta{ Name: requestName, Namespace: wl.Namespace, @@ -293,9 +294,27 @@ func (c *Controller) syncOwnedProvisionRequest( if !psFound || !psaFound { return nil, errInconsistentPodSetAssignments } + + ptName := getProvisioningRequestPodTemplateName(requestName, psName) + + pt := &corev1.PodTemplate{} + err := c.client.Get(ctx, types.NamespacedName{Namespace: wl.Namespace, Name: ptName}, pt) + if client.IgnoreNotFound(err) != nil { + return nil, err + } + if err != nil { + // it's a not found, so create it + _, err := c.createPodTemplate(ctx, wl, ptName, ps, psa) + if err != nil { + msg := api.TruncateEventMessage(fmt.Sprintf("Error creating PodTemplate %q: %v", ptName, err)) + c.record.Eventf(wl, corev1.EventTypeWarning, "FailedCreate", msg) + return nil, c.handleError(ctx, wl, ac, msg, err) + } + } + req.Spec.PodSets = append(req.Spec.PodSets, autoscaling.PodSet{ PodTemplateRef: autoscaling.Reference{ - Name: getProvisioningRequestPodTemplateName(requestName, psName), + Name: ptName, }, Count: ptr.Deref(psa.Count, ps.Count), }) @@ -313,7 +332,7 @@ func (c *Controller) syncOwnedProvisionRequest( c.record.Eventf(wl, corev1.EventTypeNormal, "ProvisioningRequestCreated", "Created ProvisioningRequest: %q", req.Name) activeOrLastPRForChecks[checkName] = req } - if err := c.syncProvisionRequestsPodTemplates(ctx, wl, requestName, prc); err != nil { + if err := c.syncProvisionRequestsPodTemplates(ctx, wl, req); err != nil { return nil, err } } @@ -354,40 +373,49 @@ func (c *Controller) handleError(ctx context.Context, wl *kueue.Workload, ac *ku return errors.Join(err, patchErr) } -func (c *Controller) syncProvisionRequestsPodTemplates(ctx context.Context, wl *kueue.Workload, prName string, prc *kueue.ProvisioningRequestConfig) error { - request := &autoscaling.ProvisioningRequest{} - requestKey := types.NamespacedName{ - Name: prName, - Namespace: wl.Namespace, +func (c *Controller) createPodTemplate(ctx context.Context, wl *kueue.Workload, name string, ps *kueue.PodSet, psa *kueue.PodSetAssignment) (*corev1.PodTemplate, error) { + newPt := &corev1.PodTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: wl.Namespace, + Labels: map[string]string{ + constants.ManagedByKueueLabel: "true", + }, + }, + Template: ps.Template, + } + + // set the controller reference to workload so that the template is not left orphaned + // if the ProvisioningRequest creation fails. The ownership is later transferred to the + // ProvisioningRequest. + if err := ctrl.SetControllerReference(wl, newPt, c.client.Scheme()); err != nil { + return nil, err + } + + // apply the admission node selectors to the Template + psi, err := podset.FromAssignment(ctx, c.client, psa, ptr.Deref(psa.Count, ps.Count)) + if err != nil { + return nil, err } - err := c.client.Get(ctx, requestKey, request) + + err = podset.Merge(&newPt.Template.ObjectMeta, &newPt.Template.Spec, psi) if err != nil { - return client.IgnoreNotFound(err) + return nil, err } - expectedPodSets := requiredPodSets(wl.Spec.PodSets, prc.Spec.ManagedResources) - podsetRefsMap := slices.ToMap(expectedPodSets, func(i int) (string, string) { - return getProvisioningRequestPodTemplateName(prName, expectedPodSets[i]), expectedPodSets[i] - }) + // copy limits to requests if needed + workload.UseLimitsAsMissingRequestsInPod(&newPt.Template.Spec) - // the order of the podSets should be the same in the workload and prov. req. - // if the number is different, just delete the request - if len(request.Spec.PodSets) != len(expectedPodSets) { - return c.client.Delete(ctx, request) + if err := c.client.Create(ctx, newPt); err != nil { + return nil, err } - psaMap := slices.ToRefMap(wl.Status.Admission.PodSetAssignments, func(p *kueue.PodSetAssignment) string { return p.Name }) - podSetMap := slices.ToRefMap(wl.Spec.PodSets, func(ps *kueue.PodSet) string { return ps.Name }) + return newPt, nil +} +func (c *Controller) syncProvisionRequestsPodTemplates(ctx context.Context, wl *kueue.Workload, request *autoscaling.ProvisioningRequest) error { for i := range request.Spec.PodSets { reqPS := &request.Spec.PodSets[i] - psName, refFound := podsetRefsMap[reqPS.PodTemplateRef.Name] - ps, psFound := podSetMap[psName] - psa, psaFound := psaMap[psName] - - if !refFound || !psFound || !psaFound || ptr.Deref(psa.Count, 0) != reqPS.Count { - return c.client.Delete(ctx, request) - } pt := &corev1.PodTemplate{} ptKey := types.NamespacedName{ @@ -396,47 +424,34 @@ func (c *Controller) syncProvisionRequestsPodTemplates(ctx context.Context, wl * } err := c.client.Get(ctx, ptKey, pt) - if client.IgnoreNotFound(err) != nil { return err } - if err != nil { - // it's a not found, so create it - newPt := &corev1.PodTemplate{ - ObjectMeta: metav1.ObjectMeta{ - Name: ptKey.Name, - Namespace: ptKey.Namespace, - Labels: map[string]string{ - constants.ManagedByKueueLabel: "true", - }, - }, - Template: ps.Template, - } + if err == nil { + var shouldUpdate bool - // apply the admission node selectors to the Template - psi, err := podset.FromAssignment(ctx, c.client, psaMap[psName], reqPS.Count) - if err != nil { - return err - } - - err = podset.Merge(&newPt.Template.ObjectMeta, &newPt.Template.Spec, psi) - if err != nil { - return err + // transfer the ownership of the template to the ProvisioningRequest + if metav1.IsControlledBy(pt, wl) { + if err := controllerutil.RemoveControllerReference(wl, pt, c.client.Scheme()); err != nil { + return err + } + shouldUpdate = true } - // copy limits to requests if needed - workload.UseLimitsAsMissingRequestsInPod(&newPt.Template.Spec) - - if err := ctrl.SetControllerReference(request, newPt, c.client.Scheme()); err != nil { - return err + if !metav1.IsControlledBy(pt, request) { + if err := controllerutil.SetControllerReference(request, pt, c.client.Scheme()); err != nil { + return err + } + shouldUpdate = true } - if err = c.client.Create(ctx, newPt); err != nil { - return err + if shouldUpdate { + if err := c.client.Update(ctx, pt); err != nil { + return err + } } } - // maybe check the consistency deeper } return nil } diff --git a/pkg/controller/admissionchecks/provisioning/controller_test.go b/pkg/controller/admissionchecks/provisioning/controller_test.go index 48157c05a7..5e651744c3 100644 --- a/pkg/controller/admissionchecks/provisioning/controller_test.go +++ b/pkg/controller/admissionchecks/provisioning/controller_test.go @@ -29,6 +29,7 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" autoscaling "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" "k8s.io/component-base/featuregate" @@ -45,7 +46,10 @@ import ( "sigs.k8s.io/kueue/pkg/workload" ) -var errInvalidProvisioningRequest = errors.New("invalid ProvisioningRequest error") +var ( + errInvalidPodTemplate = errors.New("invalid PodTemplate error") + errInvalidProvisioningRequest = errors.New("invalid ProvisioningRequest error") +) var ( wlCmpOptions = []cmp.Option{ @@ -64,7 +68,8 @@ var ( tmplCmpOptions = []cmp.Option{ cmpopts.EquateEmpty(), - cmpopts.IgnoreTypes(metav1.ObjectMeta{}, metav1.TypeMeta{}), + cmpopts.IgnoreTypes(metav1.TypeMeta{}), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), cmpopts.IgnoreFields(corev1.PodSpec{}, "RestartPolicy"), } @@ -133,8 +138,7 @@ func TestReconcile(t *testing.T) { }, kueue.AdmissionCheckState{ Name: "not-provisioning", State: kueue.CheckStatePending, - }). - Obj() + }) basePodSet := []autoscaling.PodSet{{PodTemplateRef: autoscaling.Reference{Name: "ppt-wl-check1-1-main"}, Count: 1}} @@ -189,73 +193,35 @@ func TestReconcile(t *testing.T) { }, } - baseTemplate1 := &corev1.PodTemplate{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: TestNamespace, - Name: "ppt-wl-check1-1-ps1", - Labels: map[string]string{ - constants.ManagedByKueueLabel: "true", - }, - OwnerReferences: []metav1.OwnerReference{ - { - Name: "wl-check1-1", - }, - }, - }, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "c", - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - }, - }, - }, - }, - NodeSelector: map[string]string{"f1l1": "v1"}, - Tolerations: []corev1.Toleration{ - { - Key: "f1t1k", - Value: "f1t1v", - Operator: corev1.TolerationOpEqual, - Effect: corev1.TaintEffectNoSchedule, - }, + baseTemplate1 := utiltesting.MakePodTemplate("ppt-wl-check1-1-ps1", TestNamespace). + Label(constants.ManagedByKueueLabel, "true"). + Containers(corev1.Container{ + Name: "c", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), }, }, - }, - } + }). + NodeSelector("f1l1", "v1"). + Toleration(corev1.Toleration{ + Key: "f1t1k", + Value: "f1t1v", + Operator: corev1.TolerationOpEqual, + Effect: corev1.TaintEffectNoSchedule, + }) - baseTemplate2 := &corev1.PodTemplate{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: TestNamespace, - Name: "ppt-wl-check1-1-ps2", - Labels: map[string]string{ - constants.ManagedByKueueLabel: "true", - }, - OwnerReferences: []metav1.OwnerReference{ - { - Name: "wl-check1-1", - }, - }, - }, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "c", - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceMemory: resource.MustParse("1M"), - }, - }, - }, + baseTemplate2 := utiltesting.MakePodTemplate("ppt-wl-check1-1-ps2", TestNamespace). + Label(constants.ManagedByKueueLabel, "true"). + Containers(corev1.Container{ + Name: "c", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("1M"), }, - NodeSelector: map[string]string{"f2l1": "v1"}, }, - }, - } + }). + NodeSelector("f2l1", "v1") baseConfig := utiltesting.MakeProvisioningRequestConfig("config1").ProvisioningClass("class1").WithParameter("p1", "v1") @@ -271,6 +237,8 @@ func TestReconcile(t *testing.T) { Obj() cases := map[string]struct { + interceptorFuncsCreate func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error + requests []autoscaling.ProvisioningRequest templates []corev1.PodTemplate checks []kueue.AdmissionCheck @@ -303,7 +271,7 @@ func TestReconcile(t *testing.T) { workload: baseWorkload.DeepCopy(), checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStatePending, @@ -321,14 +289,26 @@ func TestReconcile(t *testing.T) { flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()}, configs: []kueue.ProvisioningRequestConfig{*baseConfigWithRetryStrategy.DeepCopy()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: baseWorkload.DeepCopy(), + baseWorkload.GetName(): baseWorkload.DeepCopy(), }, wantRequests: map[string]*autoscaling.ProvisioningRequest{ baseRequest.Name: baseRequest.DeepCopy(), }, wantTemplates: map[string]*corev1.PodTemplate{ - baseTemplate1.Name: baseTemplate1.DeepCopy(), - baseTemplate2.Name: baseTemplate2.DeepCopy(), + baseTemplate1.Name: baseTemplate1.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "autoscaling.x-k8s.io", + Version: "v1beta1", + Kind: "ProvisioningRequest", + }, "wl-check1-1", ""). + Obj(), + baseTemplate2.Name: baseTemplate2.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "autoscaling.x-k8s.io", + Version: "v1beta1", + Kind: "ProvisioningRequest", + }, "wl-check1-1", ""). + Obj(), }, wantEvents: []utiltesting.EventRecord{ { @@ -401,7 +381,7 @@ func TestReconcile(t *testing.T) { }, }, }, - wantWorkloads: map[string]*kueue.Workload{baseWorkload.Name: baseWorkload.DeepCopy()}, + wantWorkloads: map[string]*kueue.Workload{baseWorkload.GetName(): baseWorkload.DeepCopy()}, wantRequestsNotFound: []string{"wl-check2"}, wantEvents: []utiltesting.EventRecord{ { @@ -412,22 +392,50 @@ func TestReconcile(t *testing.T) { }, }, }, - "missing one template": { - workload: baseWorkload.DeepCopy(), - checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()}, - flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()}, - configs: []kueue.ProvisioningRequestConfig{*baseConfigWithRetryStrategy.DeepCopy()}, - requests: []autoscaling.ProvisioningRequest{*baseRequest.DeepCopy()}, - templates: []corev1.PodTemplate{*baseTemplate1.DeepCopy()}, + "one template already created": { + workload: baseWorkload.DeepCopy(), + checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()}, + flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()}, + configs: []kueue.ProvisioningRequestConfig{*baseConfigWithRetryStrategy.DeepCopy()}, + requests: []autoscaling.ProvisioningRequest{}, + templates: []corev1.PodTemplate{ + *baseTemplate1.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "kueue.x-k8s.io", + Version: "v1beta1", + Kind: "Workload", + }, "wl", ""). + Obj(), + }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: baseWorkload.DeepCopy(), + baseWorkload.GetName(): baseWorkload.DeepCopy(), }, wantRequests: map[string]*autoscaling.ProvisioningRequest{ baseRequest.Name: baseRequest.DeepCopy(), }, wantTemplates: map[string]*corev1.PodTemplate{ - baseTemplate1.Name: baseTemplate1.DeepCopy(), - baseTemplate2.Name: baseTemplate2.DeepCopy(), + baseTemplate1.Name: baseTemplate1.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "autoscaling.x-k8s.io", + Version: "v1beta1", + Kind: "ProvisioningRequest", + }, "wl-check1-1", ""). + Obj(), + baseTemplate2.Name: baseTemplate2.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "autoscaling.x-k8s.io", + Version: "v1beta1", + Kind: "ProvisioningRequest", + }, "wl-check1-1", ""). + Obj(), + }, + wantEvents: []utiltesting.EventRecord{ + { + Key: client.ObjectKeyFromObject(baseWorkload), + EventType: corev1.EventTypeNormal, + Reason: "ProvisioningRequestCreated", + Message: `Created ProvisioningRequest: "wl-check1-1"`, + }, }, }, "request out of sync": { @@ -463,14 +471,26 @@ func TestReconcile(t *testing.T) { }, }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: baseWorkload.DeepCopy(), + baseWorkload.GetName(): baseWorkload.DeepCopy(), }, wantRequests: map[string]*autoscaling.ProvisioningRequest{ baseRequest.Name: baseRequest.DeepCopy(), }, wantTemplates: map[string]*corev1.PodTemplate{ - baseTemplate1.Name: baseTemplate1.DeepCopy(), - baseTemplate2.Name: baseTemplate2.DeepCopy(), + baseTemplate1.Name: baseTemplate1.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "autoscaling.x-k8s.io", + Version: "v1beta1", + Kind: "ProvisioningRequest", + }, "wl-check1-1", ""). + Obj(), + baseTemplate2.Name: baseTemplate2.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "autoscaling.x-k8s.io", + Version: "v1beta1", + Kind: "ProvisioningRequest", + }, "wl-check1-1", ""). + Obj(), }, wantEvents: []utiltesting.EventRecord{ { @@ -507,7 +527,7 @@ func TestReconcile(t *testing.T) { enableGates: []featuregate.Feature{features.KeepQuotaForProvReqRetry}, templates: []corev1.PodTemplate{*baseTemplate1.DeepCopy(), *baseTemplate2.DeepCopy()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStatePending, @@ -536,7 +556,7 @@ func TestReconcile(t *testing.T) { }, templates: []corev1.PodTemplate{*baseTemplate1.DeepCopy(), *baseTemplate2.DeepCopy()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStateRetry, @@ -559,7 +579,7 @@ func TestReconcile(t *testing.T) { }, templates: []corev1.PodTemplate{*baseTemplate1.DeepCopy(), *baseTemplate2.DeepCopy()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStateRejected, @@ -580,7 +600,7 @@ func TestReconcile(t *testing.T) { }, templates: []corev1.PodTemplate{*baseTemplate1.DeepCopy(), *baseTemplate2.DeepCopy()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStateReady, @@ -617,7 +637,7 @@ func TestReconcile(t *testing.T) { flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()}, configs: []kueue.ProvisioningRequestConfig{*baseConfig.Clone().WithManagedResource("example.org/gpu").Obj()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStateReady, @@ -635,7 +655,7 @@ func TestReconcile(t *testing.T) { flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()}, configs: []kueue.ProvisioningRequestConfig{*baseConfig.Clone().WithManagedResource(corev1.ResourceMemory).Obj()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: baseWorkload.DeepCopy(), + baseWorkload.GetName(): baseWorkload.DeepCopy(), }, wantRequests: map[string]*autoscaling.ProvisioningRequest{ "wl-check1-1": { @@ -661,7 +681,13 @@ func TestReconcile(t *testing.T) { }, }, wantTemplates: map[string]*corev1.PodTemplate{ - baseTemplate2.Name: baseTemplate2.DeepCopy(), + baseTemplate2.Name: baseTemplate2.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "autoscaling.x-k8s.io", + Version: "v1beta1", + Kind: "ProvisioningRequest", + }, "wl-check1-1", ""). + Obj(), }, wantEvents: []utiltesting.EventRecord{ { @@ -678,7 +704,7 @@ func TestReconcile(t *testing.T) { flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()}, configs: []kueue.ProvisioningRequestConfig{*baseConfig.Clone().WithManagedResource("example.com/gpu").Obj()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).Limit("example.com/gpu", "1").Obj(), + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}).Limit("example.com/gpu", "1").Obj(), }, wantRequests: map[string]*autoscaling.ProvisioningRequest{ "wl-check1-1": { @@ -704,47 +730,25 @@ func TestReconcile(t *testing.T) { }, }, wantTemplates: map[string]*corev1.PodTemplate{ - baseTemplate1.Name: &corev1.PodTemplate{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: TestNamespace, - Name: "ppt-wl-check1-1-ps1", - Labels: map[string]string{ - constants.ManagedByKueueLabel: "true", - }, - OwnerReferences: []metav1.OwnerReference{ - { - Name: "wl-check1-1", - }, - }, - }, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "c", - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - "example.com/gpu": resource.MustParse("1"), - }, - Limits: corev1.ResourceList{ - "example.com/gpu": resource.MustParse("1"), - }, - }, - }, + baseTemplate1.Name: baseTemplate1.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "autoscaling.x-k8s.io", + Version: "v1beta1", + Kind: "ProvisioningRequest", + }, "wl-check1-1", ""). + Containers(corev1.Container{ + Name: "c", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + "example.com/gpu": resource.MustParse("1"), }, - NodeSelector: map[string]string{"f1l1": "v1"}, - Tolerations: []corev1.Toleration{ - { - Key: "f1t1k", - Value: "f1t1v", - Operator: corev1.TolerationOpEqual, - Effect: corev1.TaintEffectNoSchedule, - }, + Limits: corev1.ResourceList{ + "example.com/gpu": resource.MustParse("1"), }, }, - }, - }, + }). + Obj(), }, wantEvents: []utiltesting.EventRecord{ { @@ -761,7 +765,7 @@ func TestReconcile(t *testing.T) { flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()}, configs: []kueue.ProvisioningRequestConfig{*baseConfigWithRetryStrategy.DeepCopy()}, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: baseWorkloadWithCheck1Ready.DeepCopy(), + baseWorkload.GetName(): baseWorkloadWithCheck1Ready.DeepCopy(), }, wantRequestsNotFound: []string{ ProvisioningRequestName("wl", "check1", 1), @@ -793,7 +797,7 @@ func TestReconcile(t *testing.T) { }), }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStatePending, @@ -834,7 +838,7 @@ func TestReconcile(t *testing.T) { }), }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStateRejected, @@ -875,7 +879,7 @@ func TestReconcile(t *testing.T) { }), }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStateRejected, @@ -921,7 +925,7 @@ func TestReconcile(t *testing.T) { }), }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStatePending, @@ -962,7 +966,7 @@ func TestReconcile(t *testing.T) { }), }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). Admitted(true). Obj(), }, @@ -997,7 +1001,7 @@ func TestReconcile(t *testing.T) { }), }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStatePending, @@ -1082,7 +1086,7 @@ func TestReconcile(t *testing.T) { }), }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStateRetry, @@ -1125,7 +1129,7 @@ func TestReconcile(t *testing.T) { }), }, wantWorkloads: map[string]*kueue.Workload{ - baseWorkload.Name: (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). + baseWorkload.GetName(): (&utiltesting.WorkloadWrapper{Workload: *baseWorkload.DeepCopy()}). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStateRejected, @@ -1137,7 +1141,13 @@ func TestReconcile(t *testing.T) { Obj(), }, }, - "when invalid provisioning request": { + "when pod template creation error": { + interceptorFuncsCreate: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error { + if _, ok := obj.(*corev1.PodTemplate); ok { + return errInvalidPodTemplate + } + return client.Create(ctx, obj, opts...) + }, workload: utiltesting.MakeWorkload("wl", TestNamespace). Annotations(map[string]string{ "provreq.kueue.x-k8s.io/ValidUntilSeconds": "0", @@ -1150,21 +1160,80 @@ func TestReconcile(t *testing.T) { Obj(), checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()}, configs: []kueue.ProvisioningRequestConfig{*utiltesting.MakeProvisioningRequestConfig("config1").Obj()}, - wantReconcileError: errInvalidProvisioningRequest, + wantReconcileError: errInvalidPodTemplate, wantWorkloads: map[string]*kueue.Workload{ "wl": utiltesting.MakeWorkload("wl", TestNamespace). Annotations(map[string]string{ "provreq.kueue.x-k8s.io/ValidUntilSeconds": "0", "invalid-provreq-prefix/Foo1": "Bar1", - "another-invalid-provreq-prefix/Foo2": "Bar2"}). + "another-invalid-provreq-prefix/Foo2": "Bar2", + }). AdmissionChecks(kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStatePending, - Message: "Error creating ProvisioningRequest \"wl-check1-1\": invalid ProvisioningRequest error", + Message: "Error creating PodTemplate \"ppt-wl-check1-1-main\": invalid PodTemplate error", }). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, + wantEvents: []utiltesting.EventRecord{ + { + Key: client.ObjectKeyFromObject(baseWorkload), + EventType: corev1.EventTypeWarning, + Reason: "FailedCreate", + Message: `Error creating PodTemplate "ppt-wl-check1-1-main": invalid PodTemplate error`, + }, + }, + }, + "when provisioning request creation error": { + interceptorFuncsCreate: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error { + if _, ok := obj.(*autoscaling.ProvisioningRequest); ok { + return errInvalidProvisioningRequest + } + return client.Create(ctx, obj, opts...) + }, + workload: baseWorkload.DeepCopy(), + checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()}, + flavors: []kueue.ResourceFlavor{*baseFlavor1.DeepCopy(), *baseFlavor2.DeepCopy()}, + configs: []kueue.ProvisioningRequestConfig{*baseConfigWithRetryStrategy.DeepCopy()}, + requests: []autoscaling.ProvisioningRequest{}, + templates: []corev1.PodTemplate{}, + wantReconcileError: errInvalidProvisioningRequest, + wantWorkloads: map[string]*kueue.Workload{ + baseWorkload.GetName(): baseWorkload. + Clone(). + AdmissionChecks( + kueue.AdmissionCheckState{ + Name: "check1", + State: kueue.CheckStatePending, + Message: "Error creating ProvisioningRequest \"wl-check1-1\": invalid ProvisioningRequest error", + }, + kueue.AdmissionCheckState{ + Name: "not-provisioning", + State: kueue.CheckStatePending, + }, + ). + Obj(), + }, + wantRequests: map[string]*autoscaling.ProvisioningRequest{ + baseRequest.Name: {}, + }, + wantTemplates: map[string]*corev1.PodTemplate{ + baseTemplate1.Name: baseTemplate1.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "kueue.x-k8s.io", + Version: "v1beta1", + Kind: "Workload", + }, "wl", ""). + Obj(), + baseTemplate2.Name: baseTemplate2.Clone(). + ControllerReference(schema.GroupVersionKind{ + Group: "kueue.x-k8s.io", + Version: "v1beta1", + Kind: "Workload", + }, "wl", ""). + Obj(), + }, wantEvents: []utiltesting.EventRecord{ { Key: client.ObjectKeyFromObject(baseWorkload), @@ -1183,10 +1252,8 @@ func TestReconcile(t *testing.T) { } interceptorFuncs := interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge} - if tc.wantReconcileError != nil { - interceptorFuncs.Create = func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error { - return tc.wantReconcileError - } + if tc.interceptorFuncsCreate != nil { + interceptorFuncs.Create = tc.interceptorFuncsCreate } builder, ctx := getClientBuilder() @@ -1235,7 +1302,7 @@ func TestReconcile(t *testing.T) { for name, wantRequest := range tc.wantRequests { gotRequest := &autoscaling.ProvisioningRequest{} - if err := k8sclient.Get(ctx, types.NamespacedName{Namespace: TestNamespace, Name: name}, gotRequest); err != nil { + if err := k8sclient.Get(ctx, types.NamespacedName{Namespace: TestNamespace, Name: name}, gotRequest); client.IgnoreNotFound(err) != nil { t.Errorf("unexpected error getting request %q: %s", name, err) } diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index ebe2768a90..060a6bf7a8 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -308,24 +309,12 @@ func (w *WorkloadWrapper) Conditions(conditions ...metav1.Condition) *WorkloadWr } func (w *WorkloadWrapper) ControllerReference(gvk schema.GroupVersionKind, name, uid string) *WorkloadWrapper { - w.appendOwnerReference(gvk, name, uid, ptr.To(true), ptr.To(true)) + appendOwnerReference(&w.Workload, gvk, name, uid, ptr.To(true), ptr.To(true)) return w } func (w *WorkloadWrapper) OwnerReference(gvk schema.GroupVersionKind, name, uid string) *WorkloadWrapper { - w.appendOwnerReference(gvk, name, uid, nil, nil) - return w -} - -func (w *WorkloadWrapper) appendOwnerReference(gvk schema.GroupVersionKind, name, uid string, controller, blockDeletion *bool) *WorkloadWrapper { - w.OwnerReferences = append(w.OwnerReferences, metav1.OwnerReference{ - APIVersion: gvk.GroupVersion().String(), - Kind: gvk.Kind, - Name: name, - UID: types.UID(uid), - Controller: controller, - BlockOwnerDeletion: blockDeletion, - }) + appendOwnerReference(&w.Workload, gvk, name, uid, nil, nil) return w } @@ -1433,3 +1422,68 @@ func (prc *ProvisioningRequestConfigWrapper) Clone() *ProvisioningRequestConfigW func (prc *ProvisioningRequestConfigWrapper) Obj() *kueue.ProvisioningRequestConfig { return &prc.ProvisioningRequestConfig } + +type PodTemplateWrapper struct { + corev1.PodTemplate +} + +func MakePodTemplate(name, namespace string) *PodTemplateWrapper { + return &PodTemplateWrapper{ + corev1.PodTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + } +} + +func (w *PodTemplateWrapper) Obj() *corev1.PodTemplate { + return &w.PodTemplate +} + +func (w *PodTemplateWrapper) Clone() *PodTemplateWrapper { + return &PodTemplateWrapper{PodTemplate: *w.DeepCopy()} +} + +func (w *PodTemplateWrapper) Label(k, v string) *PodTemplateWrapper { + if w.ObjectMeta.Labels == nil { + w.ObjectMeta.Labels = make(map[string]string) + } + w.ObjectMeta.Labels[k] = v + return w +} + +func (w *PodTemplateWrapper) Containers(containers ...corev1.Container) *PodTemplateWrapper { + w.Template.Spec.Containers = containers + return w +} + +func (w *PodTemplateWrapper) NodeSelector(k, v string) *PodTemplateWrapper { + if w.Template.Spec.NodeSelector == nil { + w.Template.Spec.NodeSelector = make(map[string]string) + } + w.Template.Spec.NodeSelector[k] = v + return w +} + +func (w *PodTemplateWrapper) Toleration(toleration corev1.Toleration) *PodTemplateWrapper { + w.Template.Spec.Tolerations = append(w.Template.Spec.Tolerations, toleration) + return w +} + +func (w *PodTemplateWrapper) ControllerReference(gvk schema.GroupVersionKind, name, uid string) *PodTemplateWrapper { + appendOwnerReference(&w.PodTemplate, gvk, name, uid, ptr.To(true), ptr.To(true)) + return w +} + +func appendOwnerReference(obj client.Object, gvk schema.GroupVersionKind, name, uid string, controller, blockDeletion *bool) { + obj.SetOwnerReferences(append(obj.GetOwnerReferences(), metav1.OwnerReference{ + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, + Name: name, + UID: types.UID(uid), + Controller: controller, + BlockOwnerDeletion: blockDeletion, + })) +}