diff --git a/pkg/controller/workload/common/workload_utils.go b/pkg/controller/workload/common/workload_utils.go new file mode 100644 index 0000000000..5c37b03280 --- /dev/null +++ b/pkg/controller/workload/common/workload_utils.go @@ -0,0 +1,21 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +func GetNameForJob(jobName string) string { + return "job-" + jobName +} diff --git a/pkg/controller/workload/job/job_controller.go b/pkg/controller/workload/job/job_controller.go index cd7ee57378..66c543722f 100644 --- a/pkg/controller/workload/job/job_controller.go +++ b/pkg/controller/workload/job/job_controller.go @@ -41,6 +41,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha2" "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/workload/common" utilpriority "sigs.k8s.io/kueue/pkg/util/priority" "sigs.k8s.io/kueue/pkg/workload" ) @@ -525,7 +526,7 @@ func ConstructWorkloadFor(ctx context.Context, client client.Client, job *batchv1.Job, scheme *runtime.Scheme) (*kueue.Workload, error) { w := &kueue.Workload{ ObjectMeta: metav1.ObjectMeta{ - Name: job.Name, + Name: common.GetNameForJob(job.Name), Namespace: job.Namespace, }, Spec: kueue.WorkloadSpec{ diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 320b82934e..cc76cc5012 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/types" kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha2" + "sigs.k8s.io/kueue/pkg/controller/workload/common" "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/test/util" ) @@ -60,9 +61,10 @@ var _ = ginkgo.Describe("Kueue", func() { } return *createdJob.Spec.Suspend }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + wlLookupKey := types.NamespacedName{Name: common.GetNameForJob(lookupKey.Name), Namespace: ns.Name} createdWorkload := &kueue.Workload{} gomega.Eventually(func() bool { - if err := k8sClient.Get(ctx, lookupKey, createdWorkload); err != nil { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { return false } return apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted) @@ -106,8 +108,9 @@ var _ = ginkgo.Describe("Kueue", func() { } return !*createdJob.Spec.Suspend && createdJob.Status.Succeeded > 0 }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + wlLookupKey := types.NamespacedName{Name: common.GetNameForJob(lookupKey.Name), Namespace: ns.Name} gomega.Eventually(func() bool { - if err := k8sClient.Get(ctx, lookupKey, createdWorkload); err != nil { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { return false } return apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted) && diff --git a/test/integration/controller/job/job_controller_test.go b/test/integration/controller/job/job_controller_test.go index 3b04f2e9be..7bb4060c0b 100644 --- a/test/integration/controller/job/job_controller_test.go +++ b/test/integration/controller/job/job_controller_test.go @@ -32,6 +32,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha2" "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/workload/common" workloadjob "sigs.k8s.io/kueue/pkg/controller/workload/job" "sigs.k8s.io/kueue/pkg/util/pointer" "sigs.k8s.io/kueue/pkg/util/testing" @@ -44,12 +45,16 @@ const ( parallelism = 4 jobName = "test-job" jobNamespace = "default" - jobKey = jobNamespace + "/" + jobName labelKey = "cloud.provider.com/instance" priorityClassName = "test-priority-class" priorityValue = 10 ) +var ( + lookupKey = types.NamespacedName{Name: jobName, Namespace: jobNamespace} + wlLookupKey = types.NamespacedName{Name: common.GetNameForJob(jobName), Namespace: jobNamespace} +) + var ignoreConditionTimestamps = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime") // +kubebuilder:docs-gen:collapse=Imports @@ -73,7 +78,6 @@ var _ = ginkgo.Describe("Job controller", func() { gomega.Expect(k8sClient.Create(ctx, priorityClass)).Should(gomega.Succeed()) job := testing.MakeJob(jobName, jobNamespace).PriorityClass(priorityClassName).Obj() gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) - lookupKey := types.NamespacedName{Name: jobName, Namespace: jobNamespace} createdJob := &batchv1.Job{} gomega.Eventually(func() bool { if err := k8sClient.Get(ctx, lookupKey, createdJob); err != nil { @@ -85,7 +89,7 @@ var _ = ginkgo.Describe("Job controller", func() { ginkgo.By("checking the workload is created without queue assigned") createdWorkload := &kueue.Workload{} gomega.Eventually(func() bool { - err := k8sClient.Get(ctx, lookupKey, createdWorkload) + err := k8sClient.Get(ctx, wlLookupKey, createdWorkload) return err == nil }, util.Timeout, util.Interval).Should(gomega.BeTrue()) gomega.Expect(createdWorkload.Spec.QueueName).Should(gomega.Equal(""), "The Workload shouldn't have .spec.queueName set") @@ -100,7 +104,7 @@ var _ = ginkgo.Describe("Job controller", func() { createdJob.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed()) gomega.Eventually(func() bool { - if err := k8sClient.Get(ctx, lookupKey, createdWorkload); err != nil { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { return false } return createdWorkload.Spec.QueueName == jobQueueName @@ -108,7 +112,7 @@ var _ = ginkgo.Describe("Job controller", func() { ginkgo.By("checking a second non-matching workload is deleted") secondWl, _ := workloadjob.ConstructWorkloadFor(ctx, k8sClient, createdJob, scheme.Scheme) - secondWl.Name = "second-workload" + secondWl.Name = common.GetNameForJob("second-workload") secondWl.Spec.PodSets[0].Count = parallelism + 1 gomega.Expect(k8sClient.Create(ctx, secondWl)).Should(gomega.Succeed()) gomega.Eventually(func() error { @@ -118,7 +122,7 @@ var _ = ginkgo.Describe("Job controller", func() { }, util.Timeout, util.Interval).Should(testing.BeNotFoundError()) // check the original wl is still there gomega.Consistently(func() bool { - err := k8sClient.Get(ctx, lookupKey, createdWorkload) + err := k8sClient.Get(ctx, wlLookupKey, createdWorkload) return err == nil }, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue()) gomega.Eventually(func() bool { @@ -158,7 +162,7 @@ var _ = ginkgo.Describe("Job controller", func() { gomega.Expect(len(createdJob.Spec.Template.Spec.NodeSelector)).Should(gomega.Equal(1)) gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector[labelKey]).Should(gomega.Equal(onDemandFlavor.Name)) gomega.Consistently(func() bool { - if err := k8sClient.Get(ctx, lookupKey, createdWorkload); err != nil { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { return false } return len(createdWorkload.Status.Conditions) == 0 @@ -176,13 +180,13 @@ var _ = ginkgo.Describe("Job controller", func() { len(createdJob.Spec.Template.Spec.NodeSelector) == 0 }, util.Timeout, util.Interval).Should(gomega.BeTrue()) gomega.Eventually(func() bool { - ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", jobKey)) + ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "DeletedWorkload", corev1.EventTypeNormal, fmt.Sprintf("Deleted not matching Workload: %v", wlLookupKey.String())) return ok }, util.Timeout, util.Interval).Should(gomega.BeTrue()) ginkgo.By("checking the workload is updated with new count") gomega.Eventually(func() bool { - if err := k8sClient.Get(ctx, lookupKey, createdWorkload); err != nil { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { return false } return createdWorkload.Spec.PodSets[0].Count == newParallelism @@ -208,7 +212,7 @@ var _ = ginkgo.Describe("Job controller", func() { gomega.Expect(len(createdJob.Spec.Template.Spec.NodeSelector)).Should(gomega.Equal(1)) gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector[labelKey]).Should(gomega.Equal(spotFlavor.Name)) gomega.Consistently(func() bool { - if err := k8sClient.Get(ctx, lookupKey, createdWorkload); err != nil { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { return false } return len(createdWorkload.Status.Conditions) == 0 @@ -224,7 +228,7 @@ var _ = ginkgo.Describe("Job controller", func() { }) gomega.Expect(k8sClient.Status().Update(ctx, createdJob)).Should(gomega.Succeed()) gomega.Eventually(func() bool { - err := k8sClient.Get(ctx, lookupKey, createdWorkload) + err := k8sClient.Get(ctx, wlLookupKey, createdWorkload) if err != nil || len(createdWorkload.Status.Conditions) == 0 { return false } @@ -236,10 +240,11 @@ var _ = ginkgo.Describe("Job controller", func() { ginkgo.When("The parent-workload annotation is used", func() { var ( - parentJobName = jobName + "-parent" - parentLookupKey = types.NamespacedName{Name: parentJobName, Namespace: jobNamespace} - childJobName = jobName + "-child" - childLookupKey = types.NamespacedName{Name: childJobName, Namespace: jobNamespace} + parentJobName = jobName + "-parent" + parentWlLookupKey = types.NamespacedName{Name: common.GetNameForJob(parentJobName), Namespace: jobNamespace} + childJobName = jobName + "-child" + childLookupKey = types.NamespacedName{Name: childJobName, Namespace: jobNamespace} + childWlLookupKey = types.NamespacedName{Name: common.GetNameForJob(childJobName), Namespace: jobNamespace} ) ginkgo.It("Should suspend a job if the parent workload does not exist", func() { @@ -262,7 +267,7 @@ var _ = ginkgo.Describe("Job controller", func() { ginkgo.By("waiting for the parent workload to be created") parentWorkload := &kueue.Workload{} gomega.Eventually(func() error { - return k8sClient.Get(ctx, parentLookupKey, parentWorkload) + return k8sClient.Get(ctx, parentWlLookupKey, parentWorkload) }, util.Timeout, util.Interval).Should(gomega.Succeed()) ginkgo.By("Creating the child job which uses the parent workload annotation") @@ -272,7 +277,7 @@ var _ = ginkgo.Describe("Job controller", func() { ginkgo.By("Checking that the child workload is not created") childWorkload := &kueue.Workload{} gomega.Consistently(func() bool { - return apierrors.IsNotFound(k8sClient.Get(ctx, childLookupKey, childWorkload)) + return apierrors.IsNotFound(k8sClient.Get(ctx, childWlLookupKey, childWorkload)) }, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue()) }) @@ -286,13 +291,13 @@ var _ = ginkgo.Describe("Job controller", func() { gomega.Expect(k8sClient.Create(ctx, parentJob)).Should(gomega.Succeed()) ginkgo.By("Creating the child job with the parent-workload annotation") - childJob := testing.MakeJob(childJobName, jobNamespace).ParentWorkload(parentJobName).Obj() + childJob := testing.MakeJob(childJobName, jobNamespace).ParentWorkload(parentWlLookupKey.Name).Obj() gomega.Expect(k8sClient.Create(ctx, childJob)).Should(gomega.Succeed()) ginkgo.By("waiting for the parent workload to be created") parentWorkload := &kueue.Workload{} gomega.Eventually(func() error { - return k8sClient.Get(ctx, parentLookupKey, parentWorkload) + return k8sClient.Get(ctx, parentWlLookupKey, parentWorkload) }, util.Timeout, util.Interval).Should(gomega.Succeed()) ginkgo.By("admit the parent workload") @@ -316,7 +321,7 @@ var _ = ginkgo.Describe("Job controller", func() { ginkgo.By("Unset admission of the workload to suspend the job") gomega.Eventually(func() error { - if err := k8sClient.Get(ctx, parentLookupKey, parentWorkload); err != nil { + if err := k8sClient.Get(ctx, parentWlLookupKey, parentWorkload); err != nil { return err } parentWorkload.Spec.Admission = nil @@ -347,13 +352,12 @@ var _ = ginkgo.Describe("Job controller for workloads with no queue set", func() ginkgo.By("checking the workload is not created when queue name is not set") job := testing.MakeJob(jobName, jobNamespace).Obj() gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) - lookupKey := types.NamespacedName{Name: jobName, Namespace: jobNamespace} createdJob := &batchv1.Job{} gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed()) createdWorkload := &kueue.Workload{} gomega.Consistently(func() bool { - return apierrors.IsNotFound(k8sClient.Get(ctx, lookupKey, createdWorkload)) + return apierrors.IsNotFound(k8sClient.Get(ctx, wlLookupKey, createdWorkload)) }, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue()) ginkgo.By("checking the workload is created when queue name is set") @@ -361,7 +365,7 @@ var _ = ginkgo.Describe("Job controller for workloads with no queue set", func() createdJob.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed()) gomega.Eventually(func() error { - return k8sClient.Get(ctx, lookupKey, createdWorkload) + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) ginkgo.When("The parent-workload annotation is used", func() { @@ -417,14 +421,13 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", func() { jobQueueName := "test-queue" job.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) - lookupKey := types.NamespacedName{Name: jobName, Namespace: jobNamespace} createdJob := &batchv1.Job{} gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed()) ginkgo.By("Fetch the workload created for the job") createdWorkload := &kueue.Workload{} gomega.Eventually(func() error { - return k8sClient.Get(ctx, lookupKey, createdWorkload) + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) }, util.Timeout, util.Interval).Should(gomega.Succeed()) ginkgo.By("Admit the workload created for the job") @@ -437,7 +440,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", func() { }}, } gomega.Expect(k8sClient.Update(ctx, createdWorkload)).Should(gomega.Succeed()) - gomega.Expect(k8sClient.Get(ctx, lookupKey, createdWorkload)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) ginkgo.By("Await for the job to be unsuspended") gomega.Eventually(func() *bool { @@ -455,7 +458,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", func() { if podsReadyTestSpec.beforeCondition != nil { ginkgo.By("Update the workload status") gomega.Eventually(func() *metav1.Condition { - gomega.Expect(k8sClient.Get(ctx, lookupKey, createdWorkload)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) return apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadPodsReady) }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(podsReadyTestSpec.beforeCondition, ignoreConditionTimestamps)) } @@ -470,7 +473,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", func() { gomega.Eventually(func() error { // the update may need to be retried due to a conflict as the workload gets // also updated due to setting of the job status. - if err := k8sClient.Get(ctx, lookupKey, createdWorkload); err != nil { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { return err } createdWorkload.Spec.Admission = nil @@ -480,7 +483,7 @@ var _ = ginkgo.Describe("Job controller when waitForPodsReady enabled", func() { ginkgo.By("Verify the PodsReady condition is added") gomega.Eventually(func() *metav1.Condition { - gomega.Expect(k8sClient.Get(ctx, lookupKey, createdWorkload)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) return apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadPodsReady) }, util.Timeout, util.Interval).Should(gomega.BeComparableTo(podsReadyTestSpec.wantCondition, ignoreConditionTimestamps)) },