Skip to content

Commit

Permalink
first go of jobset wroking
Browse files Browse the repository at this point in the history
we were not able to use the jobset networking,
but instead fell back to our custom headless
service with a job-group selector

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed May 9, 2023
1 parent b78ae56 commit 8a17f38
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 135 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ jobs:
fi
make deploy-local
minikube image load ghcr.io/flux-framework/flux-operator:test
VERSION=v0.1.3
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
kubectl apply -f examples/dist/flux-operator-local.yaml
- name: Test ${{ matrix.test[0] }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
minikube ssh docker pull ${container}
make deploy-local
minikube image load ghcr.io/flux-framework/flux-operator:test
VERSION=v0.1.3
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
kubectl apply -f examples/dist/flux-operator-local.yaml
- name: Test ${{ matrix.test[0] }}
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ clean:
kubectl delete -n flux-operator pods --all --grace-period=0 --force
kubectl delete -n flux-operator pvc --all --grace-period=0 --force
kubectl delete -n flux-operator pv --all --grace-period=0 --force
kubectl delete -n flux-operator jobset --all --grace-period=0 --force
kubectl delete -n flux-operator jobs --all --grace-period=0 --force
kubectl delete -n flux-operator MiniCluster --all --grace-period=0 --force

Expand Down
26 changes: 26 additions & 0 deletions chart/templates/manager-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,32 @@ rules:
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/finalizers
verbs:
- update
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- networking.k8s.io
resources:
Expand Down
26 changes: 26 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,32 @@ rules:
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/finalizers
verbs:
- update
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- networking.k8s.io
resources:
Expand Down
3 changes: 2 additions & 1 deletion controllers/flux/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func (r *MiniClusterReconciler) getContainers(
specs []api.MiniClusterContainer,
defaultName string,
mounts []corev1.VolumeMount,
entrypoint string,
) ([]corev1.Container, error) {

// Create the containers for the pod
Expand All @@ -45,7 +46,7 @@ func (r *MiniClusterReconciler) getContainers(
if container.RunFlux {

// wait.sh path corresponds to container identifier
waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i)
waitScript := fmt.Sprintf("/flux_operator/%s-%d.sh", entrypoint, i)
command = []string{"/bin/bash", waitScript, container.Command}
containerName = defaultName
}
Expand Down
100 changes: 0 additions & 100 deletions controllers/flux/job.go

This file was deleted.

32 changes: 22 additions & 10 deletions controllers/flux/jobset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,37 @@ import (

api "flux-framework/flux-operator/api/v1alpha1"

ctrl "sigs.k8s.io/controller-runtime"
jobset "sigs.k8s.io/jobset/api/v1alpha1"
)

func (r *MiniClusterReconciler) newJobSet(
cluster *api.MiniCluster,
) (*jobset.JobSet, error) {

suspend := true
// When suspend is true we have a hard time debugging jobs, so keep false
suspend := false
jobs := jobset.JobSet{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name,
Name: "minicluster",
Namespace: cluster.Namespace,
Labels: cluster.Spec.JobLabels,
},
Spec: jobset.JobSetSpec{

// Suspend child jobs (the worker pods) when broker finishes
// This might be the control for child jobs (worker)
// But I don't think we need this anymore.
Suspend: &suspend,
// TODO decide on FailurePolicy here
// default is to fail if all jobs in jobset fail
},
}

// Get leader broker job, the parent in the JobSet (worker or follower pods)
// Both are required to be in indexed completion mode to have a service!
// I'm not sure that totally makes sense, will suggest a change.
// cluster, size, entrypoint, indexed
leaderJob, err := r.getJob(cluster, 1, "broker", false)
leaderJob, err := r.getJob(cluster, 1, "broker", true)
if err != nil {
return &jobs, err
}
Expand All @@ -51,10 +56,11 @@ func (r *MiniClusterReconciler) newJobSet(
return &jobs, err
}
jobs.Spec.ReplicatedJobs = []jobset.ReplicatedJob{leaderJob, workerJob}
ctrl.SetControllerReference(cluster, &jobs, r.Scheme)
return &jobs, nil
}

// getBrokerJob creates the job for the main leader broker
// getJob creates a job for a main leader (broker) or worker (followers)
func (r *MiniClusterReconciler) getJob(
cluster *api.MiniCluster,
size int32,
Expand All @@ -64,18 +70,19 @@ func (r *MiniClusterReconciler) getJob(

backoffLimit := int32(100)
podLabels := r.getPodLabels(cluster)
enableDNSHostnames := true
enableDNSHostnames := false
completionMode := batchv1.NonIndexedCompletion

if indexed {
completionMode = batchv1.IndexedCompletion
}

// TODO how are these named
job := jobset.ReplicatedJob{
Name: cluster.Name + "-" + entrypoint,

// Allow pods to be reached by their hostnames! A simple boolean! Chef's kiss!
// This would allow pods to be reached by their hostnames!
// It doesn't work for the Flux broker config at the moment,
// but could if we are allowed to specify the service name.
// <jobSet.name>-<spec.replicatedJob.name>-<job-index>-<pod-index>.<jobSet.name>-<spec.replicatedJob.name>
Network: &jobset.Network{
EnableDNSHostnames: &enableDNSHostnames,
Expand Down Expand Up @@ -110,7 +117,7 @@ func (r *MiniClusterReconciler) getJob(
},
Spec: corev1.PodSpec{
// matches the service
// Subdomain: restfulServiceName,
Subdomain: restfulServiceName,
Volumes: getVolumes(cluster, entrypoint),
RestartPolicy: corev1.RestartPolicyOnFailure,
ImagePullSecrets: getImagePullSecrets(cluster),
Expand All @@ -130,7 +137,12 @@ func (r *MiniClusterReconciler) getJob(

// Get volume mounts, add on container specific ones
mounts := getVolumeMounts(cluster)
containers, err := r.getContainers(cluster.Spec.Containers, cluster.Name, mounts)
containers, err := r.getContainers(
cluster.Spec.Containers,
cluster.Name,
mounts,
entrypoint,
)
jobspec.Template.Spec.Containers = containers
job.Template.Spec = jobspec
return job, err
Expand Down
21 changes: 11 additions & 10 deletions controllers/flux/minicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,11 @@ func (r *MiniClusterReconciler) ensureMiniCluster(
}

// Create headless service for the MiniCluster
// We should not technically need this anymore.
// TODO I need to test the cluster, but I can't get the JobSet working
//selector := map[string]string{"job-name": cluster.Name}
//result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
//if err != nil {
// return result, err
//}
selector := map[string]string{"job-group": cluster.Name}
result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
if err != nil {
return result, err
}

// Create the batch job that brings it all together!
// A batchv1.Job can hold a spec for containers that use the configs we just made
Expand Down Expand Up @@ -452,16 +450,19 @@ func (r *MiniClusterReconciler) getConfigMap(
func generateHostlist(cluster *api.MiniCluster, size int) string {

// The hosts are generated through the max size, so the cluster can expand
return fmt.Sprintf("%s-[%s]", cluster.Name, generateRange(size))
// minicluster-flux-sample-broker-0-0
// minicluster-flux-sample-worker-0-1 through 0-3 for a size 4 cluster
return fmt.Sprintf("minicluster-%s-broker-0-0,minicluster-%s-worker-0-[%s]", cluster.Name, cluster.Name, generateRange(size-1))
}

// generateFluxConfig creates the broker.toml file used to boostrap flux
func generateFluxConfig(cluster *api.MiniCluster) string {

// The hosts are generated through the max size, so the cluster can expand
brokerFqdn := fmt.Sprintf("minicluster-%s-broker-0-0", cluster.Name)
fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", restfulServiceName, cluster.Namespace)
hosts := fmt.Sprintf("[%s]", generateRange(int(cluster.Spec.MaxSize)))
fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, cluster.Name, hosts)
hosts := fmt.Sprintf("%s, minicluster-%s-worker-0-[%s]", brokerFqdn, cluster.Name, generateRange(int(cluster.Spec.MaxSize-1)))
fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, hosts)
fluxConfig += "\n" + brokerArchiveSection
return fluxConfig
}
Expand Down
3 changes: 3 additions & 0 deletions controllers/flux/minicluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func NewMiniClusterReconciler(
//+kubebuilder:rbac:groups=networking.k8s.io,resources="ingresses",verbs=get;list;watch;create;update;patch;delete

//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=update
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete;exec
//+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;list;watch;create;update;patch;delete;exec

Expand Down
Loading

0 comments on commit 8a17f38

Please sign in to comment.