Skip to content

Commit

Permalink
first go of jobset wroking
Browse files Browse the repository at this point in the history
we were not able to use the jobset networking,
but instead fell back to our custom headless
service with a job-group selector

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed May 9, 2023
1 parent b78ae56 commit 37c6b37
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 133 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ jobs:
fi
make deploy-local
minikube image load ghcr.io/flux-framework/flux-operator:test
VERSION=v0.1.3
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
kubectl apply -f examples/dist/flux-operator-local.yaml
- name: Test ${{ matrix.test[0] }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
minikube ssh docker pull ${container}
make deploy-local
minikube image load ghcr.io/flux-framework/flux-operator:test
VERSION=v0.1.3
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
kubectl apply -f examples/dist/flux-operator-local.yaml
- name: Test ${{ matrix.test[0] }}
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ clean:
kubectl delete -n flux-operator pods --all --grace-period=0 --force
kubectl delete -n flux-operator pvc --all --grace-period=0 --force
kubectl delete -n flux-operator pv --all --grace-period=0 --force
kubectl delete -n flux-operator jobset --all --grace-period=0 --force
kubectl delete -n flux-operator jobs --all --grace-period=0 --force
kubectl delete -n flux-operator MiniCluster --all --grace-period=0 --force

Expand Down
26 changes: 26 additions & 0 deletions chart/templates/manager-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,32 @@ rules:
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/finalizers
verbs:
- update
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- networking.k8s.io
resources:
Expand Down
26 changes: 26 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,32 @@ rules:
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/finalizers
verbs:
- update
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- networking.k8s.io
resources:
Expand Down
3 changes: 2 additions & 1 deletion controllers/flux/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func (r *MiniClusterReconciler) getContainers(
specs []api.MiniClusterContainer,
defaultName string,
mounts []corev1.VolumeMount,
entrypoint string,
) ([]corev1.Container, error) {

// Create the containers for the pod
Expand All @@ -45,7 +46,7 @@ func (r *MiniClusterReconciler) getContainers(
if container.RunFlux {

// wait.sh path corresponds to container identifier
waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i)
waitScript := fmt.Sprintf("/flux_operator/%s-%d.sh", entrypoint, i)
command = []string{"/bin/bash", waitScript, container.Command}
containerName = defaultName
}
Expand Down
100 changes: 0 additions & 100 deletions controllers/flux/job.go

This file was deleted.

27 changes: 19 additions & 8 deletions controllers/flux/jobset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,38 @@ import (

api "flux-framework/flux-operator/api/v1alpha1"

ctrl "sigs.k8s.io/controller-runtime"
jobset "sigs.k8s.io/jobset/api/v1alpha1"
)

func (r *MiniClusterReconciler) newJobSet(
cluster *api.MiniCluster,
) (*jobset.JobSet, error) {

suspend := true
// I don't really understand how this works, but it seems to be
// not creating any pods? So bad idea?
suspend := false
jobs := jobset.JobSet{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name,
Name: "minicluster",
Namespace: cluster.Namespace,
Labels: cluster.Spec.JobLabels,
},
Spec: jobset.JobSetSpec{

// Suspend child jobs (the worker pods) when broker finishes
// How do I define a child job?
Suspend: &suspend,
// TODO decide on FailurePolicy here
// default is to fail if all jobs in jobset fail
},
}

// Get leader broker job, the parent in the JobSet (worker or follower pods)
// Both are required to be in indexed completion mode to have a service!
// I'm not sure that totally makes sense, but ok!
// cluster, size, entrypoint, indexed
leaderJob, err := r.getJob(cluster, 1, "broker", false)
leaderJob, err := r.getJob(cluster, 1, "broker", true)
if err != nil {
return &jobs, err
}
Expand All @@ -51,10 +57,11 @@ func (r *MiniClusterReconciler) newJobSet(
return &jobs, err
}
jobs.Spec.ReplicatedJobs = []jobset.ReplicatedJob{leaderJob, workerJob}
ctrl.SetControllerReference(cluster, &jobs, r.Scheme)
return &jobs, nil
}

// getBrokerJob creates the job for the main leader broker
// getJob creates a job for a main leader (broker) or worker (followers)
func (r *MiniClusterReconciler) getJob(
cluster *api.MiniCluster,
size int32,
Expand All @@ -64,14 +71,13 @@ func (r *MiniClusterReconciler) getJob(

backoffLimit := int32(100)
podLabels := r.getPodLabels(cluster)
enableDNSHostnames := true
enableDNSHostnames := false
completionMode := batchv1.NonIndexedCompletion

if indexed {
completionMode = batchv1.IndexedCompletion
}

// TODO how are these named
job := jobset.ReplicatedJob{
Name: cluster.Name + "-" + entrypoint,

Expand Down Expand Up @@ -110,7 +116,7 @@ func (r *MiniClusterReconciler) getJob(
},
Spec: corev1.PodSpec{
// matches the service
// Subdomain: restfulServiceName,
Subdomain: restfulServiceName,
Volumes: getVolumes(cluster, entrypoint),
RestartPolicy: corev1.RestartPolicyOnFailure,
ImagePullSecrets: getImagePullSecrets(cluster),
Expand All @@ -130,7 +136,12 @@ func (r *MiniClusterReconciler) getJob(

// Get volume mounts, add on container specific ones
mounts := getVolumeMounts(cluster)
containers, err := r.getContainers(cluster.Spec.Containers, cluster.Name, mounts)
containers, err := r.getContainers(
cluster.Spec.Containers,
cluster.Name,
mounts,
entrypoint,
)
jobspec.Template.Spec.Containers = containers
job.Template.Spec = jobspec
return job, err
Expand Down
21 changes: 11 additions & 10 deletions controllers/flux/minicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,11 @@ func (r *MiniClusterReconciler) ensureMiniCluster(
}

// Create headless service for the MiniCluster
// We should not technically need this anymore.
// TODO I need to test the cluster, but I can't get the JobSet working
//selector := map[string]string{"job-name": cluster.Name}
//result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
//if err != nil {
// return result, err
//}
selector := map[string]string{"job-group": cluster.Name}
result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
if err != nil {
return result, err
}

// Create the batch job that brings it all together!
// A batchv1.Job can hold a spec for containers that use the configs we just made
Expand Down Expand Up @@ -452,16 +450,19 @@ func (r *MiniClusterReconciler) getConfigMap(
func generateHostlist(cluster *api.MiniCluster, size int) string {

// The hosts are generated through the max size, so the cluster can expand
return fmt.Sprintf("%s-[%s]", cluster.Name, generateRange(size))
// minicluster-flux-sample-broker-0-0
// minicluster-flux-sample-worker-0-1 through 0-3 for a size 4 cluster
return fmt.Sprintf("minicluster-%s-broker-0-0,minicluster-%s-worker-0-[%s]", cluster.Name, cluster.Name, generateRange(size-1))
}

// generateFluxConfig creates the broker.toml file used to boostrap flux
func generateFluxConfig(cluster *api.MiniCluster) string {

// The hosts are generated through the max size, so the cluster can expand
brokerFqdn := fmt.Sprintf("minicluster-%s-broker-0-0", cluster.Name)
fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", restfulServiceName, cluster.Namespace)
hosts := fmt.Sprintf("[%s]", generateRange(int(cluster.Spec.MaxSize)))
fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, cluster.Name, hosts)
hosts := fmt.Sprintf("%s, minicluster-%s-worker-0-[%s]", brokerFqdn, cluster.Name, generateRange(int(cluster.Spec.MaxSize-1)))
fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, hosts)
fluxConfig += "\n" + brokerArchiveSection
return fluxConfig
}
Expand Down
3 changes: 3 additions & 0 deletions controllers/flux/minicluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func NewMiniClusterReconciler(
//+kubebuilder:rbac:groups=networking.k8s.io,resources="ingresses",verbs=get;list;watch;create;update;patch;delete

//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=update
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete;exec
//+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;list;watch;create;update;patch;delete;exec

Expand Down
Loading

0 comments on commit 37c6b37

Please sign in to comment.