Skip to content

Commit e33a788

Browse files
authored
Implement PodSet inference for JobSet (#316)
Fixes #304.
1 parent 9a46540 commit e33a788

File tree

7 files changed

+144
-4
lines changed

7 files changed

+144
-4
lines changed

config/rbac/role.yaml

+12
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,18 @@ rules:
9090
- patch
9191
- update
9292
- watch
93+
- apiGroups:
94+
- jobset.x-k8s.io
95+
resources:
96+
- jobsets
97+
verbs:
98+
- create
99+
- delete
100+
- get
101+
- list
102+
- patch
103+
- update
104+
- watch
93105
- apiGroups:
94106
- kubeflow.org
95107
resources:

hack/e2e-util.sh

+14-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ export IMAGE_KUBEFLOW_OPERATOR="docker.io/kubeflow/training-operator:v1-855e096"
2929
export KUBERAY_VERSION=1.1.0
3030
export IMAGE_KUBERAY_OPERATOR="quay.io/kuberay/operator:v1.1.1"
3131

32+
export JOBSET_VERSION=v0.7.3
33+
export IMAGE_JOBSET_OPERATOR="registry.k8s.io/jobset/jobset:v0.7.3"
34+
3235
# These are small images used by the e2e tests.
3336
# Pull and kind load to avoid long delays during testing
3437
export IMAGE_ECHOSERVER="quay.io/project-codeflare/echo-server:1.0"
@@ -142,7 +145,7 @@ function check_prerequisites {
142145
}
143146

144147
function pull_images {
145-
for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR}
148+
for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR} ${IMAGE_JOBSET_OPERATOR}
146149
do
147150
docker pull $image
148151
if [ $? -ne 0 ]
@@ -238,7 +241,7 @@ function kind_up_cluster {
238241
}
239242

240243
function kind_load_images {
241-
for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR}
244+
for image in ${IMAGE_ECHOSERVER} ${IMAGE_BUSY_BOX_LATEST} ${IMAGE_CURL} ${IMAGE_KUBEFLOW_OPERATOR} ${IMAGE_KUBERAY_OPERATOR} ${IMAGE_JOBSET_OPERATOR}
242245
do
243246
kind load docker-image ${image} ${CLUSTER_CONTEXT}
244247
if [ $? -ne 0 ]
@@ -267,6 +270,15 @@ function configure_cluster {
267270
echo -n "." && sleep 1;
268271
done
269272
echo ""
273+
274+
echo "Installing JobSet operator version $JOBSET_VERSION"
275+
kubectl apply --server-side -f "https://github.com/kubernetes-sigs/jobset/releases/download/${JOBSET_VERSION}/manifests.yaml"
276+
echo "Waiting for pods in the jobset namespace to become ready"
277+
while [[ $(kubectl get pods -n jobset-system -o 'jsonpath={..status.conditions[?(@.type=="Ready")].status}' | tr ' ' '\n' | sort -u) != "True" ]]
278+
do
279+
echo -n "." && sleep 1;
280+
done
281+
echo ""
270282
}
271283

272284
function wait_for_appwrapper_controller {

internal/controller/appwrapper/appwrapper_controller.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ type componentStatusSummary struct {
8080
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/status,verbs=get;update;patch
8181
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/finalizers,verbs=update
8282

83-
// permission to edit wrapped resources: pods, services, jobs, podgroups, pytorchjobs, rayclusters
83+
// permission to edit wrapped resources: pods, services, jobs, podgroups, pytorchjobs, rayclusters, jobsets
8484

8585
//+kubebuilder:rbac:groups="",resources=pods;services,verbs=get;list;watch;create;update;patch;delete
8686
//+kubebuilder:rbac:groups=apps,resources=deployments;statefulsets,verbs=get;list;watch;create;update;patch;delete
@@ -89,6 +89,7 @@ type componentStatusSummary struct {
8989
//+kubebuilder:rbac:groups=scheduling.x-k8s.io,resources=podgroups,verbs=get;list;watch;create;update;patch;delete
9090
//+kubebuilder:rbac:groups=kubeflow.org,resources=pytorchjobs,verbs=get;list;watch;create;update;patch;delete
9191
//+kubebuilder:rbac:groups=ray.io,resources=rayclusters;rayjobs,verbs=get;list;watch;create;update;patch;delete
92+
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;create;update;patch;delete
9293

9394
// Reconcile reconciles an appwrapper
9495
// Please see [aw-states] for documentation of this method.

pkg/utils/utils.go

+21
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,27 @@ func InferPodSets(obj *unstructured.Unstructured) ([]workloadv1beta2.AppWrapperP
492492
}
493493
podSets = append(podSets, workloadv1beta2.AppWrapperPodSet{Replicas: ptr.To(replicas), Path: "template.spec.template"})
494494

495+
case schema.GroupVersionKind{Group: "jobset.x-k8s.io", Version: "v1alpha2", Kind: "JobSet"}:
496+
if jobs, err := getValueAtPath(obj.UnstructuredContent(), "template.spec.replicatedJobs"); err == nil {
497+
if jobs, ok := jobs.([]interface{}); ok {
498+
for i := range jobs {
499+
jobSpecPrefix := fmt.Sprintf("template.spec.replicatedJobs[%v].", i)
500+
// validate path to replica template
501+
if _, err := getValueAtPath(obj.UnstructuredContent(), jobSpecPrefix+"template"); err == nil {
502+
var replicas int32 = 1
503+
if parallelism, err := GetReplicas(obj, jobSpecPrefix+"template.spec.parallelism"); err == nil {
504+
replicas = parallelism
505+
}
506+
if completions, err := GetReplicas(obj, jobSpecPrefix+"template.spec.completions"); err == nil && completions < replicas {
507+
replicas = completions
508+
}
509+
// infer replica count
510+
podSets = append(podSets, workloadv1beta2.AppWrapperPodSet{Replicas: ptr.To(replicas), Path: jobSpecPrefix + "template.spec.template"})
511+
}
512+
}
513+
}
514+
}
515+
495516
case schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "PyTorchJob"}:
496517
for _, replicaType := range []string{"Master", "Worker"} {
497518
prefix := "template.spec.pytorchReplicaSpecs." + replicaType + "."

samples/wrapped-jobset.yaml

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
apiVersion: workload.codeflare.dev/v1beta2
2+
kind: AppWrapper
3+
metadata:
4+
name: sample-jobset
5+
labels:
6+
kueue.x-k8s.io/queue-name: default-queue
7+
spec:
8+
components:
9+
- template:
10+
apiVersion: jobset.x-k8s.io/v1alpha2
11+
kind: JobSet
12+
metadata:
13+
name: sample-jobset
14+
spec:
15+
replicatedJobs:
16+
- name: workers
17+
template:
18+
spec:
19+
parallelism: 4
20+
completions: 4
21+
backoffLimit: 0
22+
template:
23+
spec:
24+
restartPolicy: Never
25+
containers:
26+
- name: sleep
27+
image: quay.io/project-codeflare/busybox:1.36
28+
command: ["sh", "-c", "sleep 100"]
29+
resources:
30+
requests:
31+
cpu: 100m
32+
- name: driver
33+
template:
34+
spec:
35+
parallelism: 1
36+
completions: 1
37+
backoffLimit: 0
38+
template:
39+
spec:
40+
restartPolicy: Never
41+
containers:
42+
- name: sleep
43+
image: quay.io/project-codeflare/busybox:1.36
44+
command: ["sh", "-c", "sleep 100"]
45+
resources:
46+
requests:
47+
cpu: 100m

test/e2e/appwrapper_test.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,16 @@ var _ = Describe("AppWrapper E2E Test", func() {
104104
})
105105
})
106106

107-
// TODO: JobSets (would have to deploy JobSet controller on e2e test cluster)
107+
Describe("Creation of JobSet GVKs", Label("Kueue", "Standalone"), func() {
108+
It("JobSet", func() {
109+
aw := createAppWrapper(ctx, jobset(500))
110+
appwrappers = append(appwrappers, aw)
111+
// TODO: Need dev versions of kueue/jobset to get correct handling of ownership
112+
// Once those are released change the test to:
113+
// Expect(waitAWPodsReady(ctx, aw)).Should(Succeed())
114+
Eventually(AppWrapperPhase(ctx, aw), 15*time.Second).Should(Equal(workloadv1beta2.AppWrapperResuming))
115+
})
116+
})
108117

109118
Describe("Webhook Enforces AppWrapper Invariants", Label("Webhook"), func() {
110119
Context("Structural Invariants", func() {

test/e2e/fixtures_test.go

+38
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,44 @@ func stuckInitBatchjob(milliCPU int64) workloadv1beta2.AppWrapperComponent {
336336
}
337337
}
338338

339+
const jobsetYAML = `
340+
apiVersion: jobset.x-k8s.io/v1alpha2
341+
kind: JobSet
342+
metadata:
343+
generateName: %v
344+
spec:
345+
replicatedJobs:
346+
- name: driver
347+
template:
348+
spec:
349+
parallelism: 1
350+
completions: 1
351+
backoffLimit: 0
352+
template:
353+
spec:
354+
restartPolicy: Never
355+
containers:
356+
- name: sleep
357+
image: quay.io/project-codeflare/busybox:1.36
358+
command: ["sh", "-c", "sleep 100"]
359+
resources:
360+
requests:
361+
cpu: %v
362+
`
363+
364+
func jobset(milliCPU int64) workloadv1beta2.AppWrapperComponent {
365+
yamlString := fmt.Sprintf(jobsetYAML,
366+
"jobset-",
367+
resource.NewMilliQuantity(milliCPU, resource.DecimalSI))
368+
369+
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
370+
Expect(err).NotTo(HaveOccurred())
371+
return workloadv1beta2.AppWrapperComponent{
372+
DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Path: "template.spec.replicatedJobs[0].template.spec.template"}},
373+
Template: runtime.RawExtension{Raw: jsonBytes},
374+
}
375+
}
376+
339377
// This is not a useful PyTorchJob:
340378
// 1. Using a dummy busybox image to avoid pulling a large & rate-limited image from dockerhub
341379
// 2. We avoid needing the injected sidecar (alpine:3.10 from dockerhub) by not specifying a Master

0 commit comments

Comments
 (0)