diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index a796236d..cff7d182 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -101,6 +101,8 @@ jobs: fi make deploy-local minikube image load ghcr.io/flux-framework/flux-operator:test + VERSION=v0.1.3 + kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml kubectl apply -f examples/dist/flux-operator-local.yaml - name: Test ${{ matrix.test[0] }} diff --git a/.github/workflows/test-python.yaml b/.github/workflows/test-python.yaml index fad67bbd..f5abbf81 100644 --- a/.github/workflows/test-python.yaml +++ b/.github/workflows/test-python.yaml @@ -71,6 +71,8 @@ jobs: minikube ssh docker pull ${container} make deploy-local minikube image load ghcr.io/flux-framework/flux-operator:test + VERSION=v0.1.3 + kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml kubectl apply -f examples/dist/flux-operator-local.yaml - name: Test ${{ matrix.test[0] }} diff --git a/Makefile b/Makefile index 3570930f..82b0020b 100644 --- a/Makefile +++ b/Makefile @@ -165,6 +165,7 @@ clean: kubectl delete -n flux-operator pods --all --grace-period=0 --force kubectl delete -n flux-operator pvc --all --grace-period=0 --force kubectl delete -n flux-operator pv --all --grace-period=0 --force + kubectl delete -n flux-operator jobset --all --grace-period=0 --force kubectl delete -n flux-operator jobs --all --grace-period=0 --force kubectl delete -n flux-operator MiniCluster --all --grace-period=0 --force diff --git a/chart/templates/manager-rbac.yaml b/chart/templates/manager-rbac.yaml index 97462bd1..20fcf79d 100644 --- a/chart/templates/manager-rbac.yaml +++ b/chart/templates/manager-rbac.yaml @@ -273,6 +273,32 @@ rules: - patch - update - watch +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets/finalizers + verbs: + - update +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets/status + verbs: + - get + - patch + - update - apiGroups: - networking.k8s.io resources: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 8cef0c43..4ccd30f8 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -273,6 +273,32 @@ rules: - patch - update - watch +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets/finalizers + verbs: + - update +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets/status + verbs: + - get + - patch + - update - apiGroups: - networking.k8s.io resources: diff --git a/controllers/flux/containers.go b/controllers/flux/containers.go index 9ff8fe18..222ccbb2 100644 --- a/controllers/flux/containers.go +++ b/controllers/flux/containers.go @@ -23,6 +23,7 @@ func (r *MiniClusterReconciler) getContainers( specs []api.MiniClusterContainer, defaultName string, mounts []corev1.VolumeMount, + entrypoint string, ) ([]corev1.Container, error) { // Create the containers for the pod @@ -45,7 +46,7 @@ func (r *MiniClusterReconciler) getContainers( if container.RunFlux { // wait.sh path corresponds to container identifier - waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i) + waitScript := fmt.Sprintf("/flux_operator/%s-%d.sh", entrypoint, i) command = []string{"/bin/bash", waitScript, container.Command} containerName = defaultName } diff --git a/controllers/flux/job.go b/controllers/flux/job.go deleted file mode 100644 index 2e952f45..00000000 --- a/controllers/flux/job.go +++ /dev/null @@ -1,100 +0,0 @@ -/* -Copyright 2022-2023 Lawrence Livermore National Security, LLC - (c.f. AUTHORS, NOTICE.LLNS, COPYING) - -This is part of the Flux resource manager framework. -For details, see https://github.com/flux-framework. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package controllers - -import ( - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrl "sigs.k8s.io/controller-runtime" - - api "flux-framework/flux-operator/api/v1alpha1" -) - -// newMiniCluster is used to create the MiniCluster Job -func (r *MiniClusterReconciler) newMiniClusterJob( - cluster *api.MiniCluster, -) (*batchv1.Job, error) { - - // Number of retries before marking as failed - backoffLimit := int32(100) - completionMode := batchv1.IndexedCompletion - podLabels := r.getPodLabels(cluster) - setAsFQDN := false - - // This is an indexed-job - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: cluster.Name, - Namespace: cluster.Namespace, - Labels: cluster.Spec.JobLabels, - }, - - Spec: batchv1.JobSpec{ - - BackoffLimit: &backoffLimit, - Completions: &cluster.Spec.Size, - Parallelism: &cluster.Spec.Size, - CompletionMode: &completionMode, - ActiveDeadlineSeconds: &cluster.Spec.DeadlineSeconds, - - // Note there is parameter to limit runtime - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Name: cluster.Name, - Namespace: cluster.Namespace, - Labels: podLabels, - Annotations: cluster.Spec.Pod.Annotations, - }, - Spec: corev1.PodSpec{ - // matches the service - Subdomain: restfulServiceName, - SetHostnameAsFQDN: &setAsFQDN, - // Not currently in use, commented out for now - //Volumes: getVolumes(cluster), - RestartPolicy: corev1.RestartPolicyOnFailure, - ImagePullSecrets: getImagePullSecrets(cluster), - ServiceAccountName: cluster.Spec.Pod.ServiceAccountName, - NodeSelector: cluster.Spec.Pod.NodeSelector, - }}, - }, - } - - // Get resources for the pod - resources, err := r.getPodResources(cluster) - r.log.Info("🌀 MiniCluster", "Pod.Resources", resources) - if err != nil { - r.log.Info("🌀 MiniCluster", "Pod.Resources", resources) - return job, err - } - job.Spec.Template.Spec.Overhead = resources - - // Get volume mounts, add on container specific ones - mounts := getVolumeMounts(cluster) - containers, err := r.getContainers(cluster.Spec.Containers, cluster.Name, mounts) - job.Spec.Template.Spec.Containers = containers - ctrl.SetControllerReference(cluster, job, r.Scheme) - return job, err -} - -// getImagePullSecrets returns a list of secret object references for each container. -func getImagePullSecrets(cluster *api.MiniCluster) []corev1.LocalObjectReference { - pullSecrets := []corev1.LocalObjectReference{} - for _, container := range cluster.Spec.Containers { - if container.ImagePullSecret != "" { - newSecret := corev1.LocalObjectReference{ - Name: container.ImagePullSecret, - } - pullSecrets = append(pullSecrets, newSecret) - } - } - return pullSecrets -} diff --git a/controllers/flux/jobset.go b/controllers/flux/jobset.go index 2bdfd6df..856a1f2b 100644 --- a/controllers/flux/jobset.go +++ b/controllers/flux/jobset.go @@ -17,6 +17,7 @@ import ( api "flux-framework/flux-operator/api/v1alpha1" + ctrl "sigs.k8s.io/controller-runtime" jobset "sigs.k8s.io/jobset/api/v1alpha1" ) @@ -24,16 +25,18 @@ func (r *MiniClusterReconciler) newJobSet( cluster *api.MiniCluster, ) (*jobset.JobSet, error) { - suspend := true + // When suspend is true we have a hard time debugging jobs, so keep false + suspend := false jobs := jobset.JobSet{ ObjectMeta: metav1.ObjectMeta{ - Name: cluster.Name, + Name: "minicluster", Namespace: cluster.Namespace, Labels: cluster.Spec.JobLabels, }, Spec: jobset.JobSetSpec{ - // Suspend child jobs (the worker pods) when broker finishes + // This might be the control for child jobs (worker) + // But I don't think we need this anymore. Suspend: &suspend, // TODO decide on FailurePolicy here // default is to fail if all jobs in jobset fail @@ -41,8 +44,10 @@ func (r *MiniClusterReconciler) newJobSet( } // Get leader broker job, the parent in the JobSet (worker or follower pods) + // Both are required to be in indexed completion mode to have a service! + // I'm not sure that totally makes sense, will suggest a change. // cluster, size, entrypoint, indexed - leaderJob, err := r.getJob(cluster, 1, "broker", false) + leaderJob, err := r.getJob(cluster, 1, "broker", true) if err != nil { return &jobs, err } @@ -51,10 +56,11 @@ func (r *MiniClusterReconciler) newJobSet( return &jobs, err } jobs.Spec.ReplicatedJobs = []jobset.ReplicatedJob{leaderJob, workerJob} + ctrl.SetControllerReference(cluster, &jobs, r.Scheme) return &jobs, nil } -// getBrokerJob creates the job for the main leader broker +// getJob creates a job for a main leader (broker) or worker (followers) func (r *MiniClusterReconciler) getJob( cluster *api.MiniCluster, size int32, @@ -64,18 +70,19 @@ func (r *MiniClusterReconciler) getJob( backoffLimit := int32(100) podLabels := r.getPodLabels(cluster) - enableDNSHostnames := true + enableDNSHostnames := false completionMode := batchv1.NonIndexedCompletion if indexed { completionMode = batchv1.IndexedCompletion } - // TODO how are these named job := jobset.ReplicatedJob{ Name: cluster.Name + "-" + entrypoint, - // Allow pods to be reached by their hostnames! A simple boolean! Chef's kiss! + // This would allow pods to be reached by their hostnames! + // It doesn't work for the Flux broker config at the moment, + // but could if we are allowed to specify the service name. // ---.- Network: &jobset.Network{ EnableDNSHostnames: &enableDNSHostnames, @@ -110,7 +117,7 @@ func (r *MiniClusterReconciler) getJob( }, Spec: corev1.PodSpec{ // matches the service - // Subdomain: restfulServiceName, + Subdomain: restfulServiceName, Volumes: getVolumes(cluster, entrypoint), RestartPolicy: corev1.RestartPolicyOnFailure, ImagePullSecrets: getImagePullSecrets(cluster), @@ -130,7 +137,12 @@ func (r *MiniClusterReconciler) getJob( // Get volume mounts, add on container specific ones mounts := getVolumeMounts(cluster) - containers, err := r.getContainers(cluster.Spec.Containers, cluster.Name, mounts) + containers, err := r.getContainers( + cluster.Spec.Containers, + cluster.Name, + mounts, + entrypoint, + ) jobspec.Template.Spec.Containers = containers job.Template.Spec = jobspec return job, err diff --git a/controllers/flux/minicluster.go b/controllers/flux/minicluster.go index 78a26f9e..0a042eb1 100644 --- a/controllers/flux/minicluster.go +++ b/controllers/flux/minicluster.go @@ -89,13 +89,11 @@ func (r *MiniClusterReconciler) ensureMiniCluster( } // Create headless service for the MiniCluster - // We should not technically need this anymore. - // TODO I need to test the cluster, but I can't get the JobSet working - //selector := map[string]string{"job-name": cluster.Name} - //result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector) - //if err != nil { - // return result, err - //} + selector := map[string]string{"job-group": cluster.Name} + result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector) + if err != nil { + return result, err + } // Create the batch job that brings it all together! // A batchv1.Job can hold a spec for containers that use the configs we just made @@ -452,16 +450,19 @@ func (r *MiniClusterReconciler) getConfigMap( func generateHostlist(cluster *api.MiniCluster, size int) string { // The hosts are generated through the max size, so the cluster can expand - return fmt.Sprintf("%s-[%s]", cluster.Name, generateRange(size)) + // minicluster-flux-sample-broker-0-0 + // minicluster-flux-sample-worker-0-1 through 0-3 for a size 4 cluster + return fmt.Sprintf("minicluster-%s-broker-0-0,minicluster-%s-worker-0-[%s]", cluster.Name, cluster.Name, generateRange(size-1)) } // generateFluxConfig creates the broker.toml file used to boostrap flux func generateFluxConfig(cluster *api.MiniCluster) string { // The hosts are generated through the max size, so the cluster can expand + brokerFqdn := fmt.Sprintf("minicluster-%s-broker-0-0", cluster.Name) fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", restfulServiceName, cluster.Namespace) - hosts := fmt.Sprintf("[%s]", generateRange(int(cluster.Spec.MaxSize))) - fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, cluster.Name, hosts) + hosts := fmt.Sprintf("%s, minicluster-%s-worker-0-[%s]", brokerFqdn, cluster.Name, generateRange(int(cluster.Spec.MaxSize-1))) + fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, hosts) fluxConfig += "\n" + brokerArchiveSection return fluxConfig } diff --git a/controllers/flux/minicluster_controller.go b/controllers/flux/minicluster_controller.go index 26293aa8..46c8735e 100644 --- a/controllers/flux/minicluster_controller.go +++ b/controllers/flux/minicluster_controller.go @@ -89,6 +89,9 @@ func NewMiniClusterReconciler( //+kubebuilder:rbac:groups=networking.k8s.io,resources="ingresses",verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update +//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=update //+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete;exec //+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;list;watch;create;update;patch;delete;exec diff --git a/controllers/flux/pods.go b/controllers/flux/pods.go index b54deb9e..3656434e 100644 --- a/controllers/flux/pods.go +++ b/controllers/flux/pods.go @@ -28,6 +28,8 @@ func (r *MiniClusterReconciler) getPodLabels(cluster *api.MiniCluster) map[strin podLabels := cluster.Spec.Pod.Labels podLabels["namespace"] = cluster.Namespace podLabels["app.kubernetes.io/name"] = cluster.Name + // This will select the service + podLabels["job-group"] = cluster.Name return podLabels } @@ -109,9 +111,6 @@ func (r *MiniClusterReconciler) newServicePod( podLabels := r.getPodLabels(cluster) podServiceName := cluster.Name + "-services" - // service selector? - podLabels["job-name"] = cluster.Name - // This is an indexed-job pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -132,7 +131,9 @@ func (r *MiniClusterReconciler) newServicePod( }, } mounts := []corev1.VolumeMount{} - containers, err := r.getContainers(cluster.Spec.Services, podServiceName, mounts) + + // A service container has no entrypoint for a flux runner (empty string) + containers, err := r.getContainers(cluster.Spec.Services, podServiceName, mounts, "") if err != nil { return pod, err } @@ -140,3 +141,17 @@ func (r *MiniClusterReconciler) newServicePod( ctrl.SetControllerReference(cluster, pod, r.Scheme) return pod, err } + +// getImagePullSecrets returns a list of secret object references for each container. +func getImagePullSecrets(cluster *api.MiniCluster) []corev1.LocalObjectReference { + pullSecrets := []corev1.LocalObjectReference{} + for _, container := range cluster.Spec.Containers { + if container.ImagePullSecret != "" { + newSecret := corev1.LocalObjectReference{ + Name: container.ImagePullSecret, + } + pullSecrets = append(pullSecrets, newSecret) + } + } + return pullSecrets +} diff --git a/controllers/flux/templates/broker.toml b/controllers/flux/templates/broker.toml index e6f40fba..a8584c56 100644 --- a/controllers/flux/templates/broker.toml +++ b/controllers/flux/templates/broker.toml @@ -16,5 +16,5 @@ default_port = 8050 default_bind = "tcp://eth0:%%p" default_connect = "tcp://%%h.%s:%%p" hosts = [ - { host="%s-%s"}, + { host="%s"}, ] diff --git a/docs/tutorials/jobset.md b/docs/tutorials/jobset.md index f966a228..58778c0d 100644 --- a/docs/tutorials/jobset.md +++ b/docs/tutorials/jobset.md @@ -26,8 +26,39 @@ $ kubectl apply -f examples/dist/flux-operator-dev.yaml $ kubectl apply -f examples/tests/jobset/minicluster.yaml ``` -This is a WIP! The JobSet (when installed as above) isn't seen by the operator: +And then you can see the pods: ```bash -2023-05-09T00:09:51Z ERROR Reconciler error {"controller": "minicluster", "controllerGroup": "flux-framework.org", "controllerKind": "MiniCluster", "MiniCluster": {"name":"flux-sample","namespace":"flux-operator"}, "namespace": "flux-operator", "name": "flux-sample", "reconcileID": "1db5223d-1f8a-4e49-a219-8d3ba55de826", "error": "no kind is registered for the type v1alpha1.JobSet in scheme \"pkg/runtime/scheme.go:100\""} +kubectl get -n flux-operator pods +NAME READY STATUS RESTARTS AGE +minicluster-flux-sample-broker-0-0-fsv4p 1/1 Running 0 12m +minicluster-flux-sample-worker-0-0-tjg69 1/1 Running 0 12m +minicluster-flux-sample-worker-0-1-hwmbb 1/1 Running 0 12m +minicluster-flux-sample-worker-0-2-q246d 1/1 Running 0 12m +``` + +And that the broker is running and ready for dootie! I mean duty! :D + +``` +✨ Curve certificate generated by helper pod +# **** Generated on 2023-04-26 22:54:42 by CZMQ **** +# ZeroMQ CURVE **Secret** Certificate +# DO NOT PROVIDE THIS FILE TO OTHER USERS nor change its permissions. + +metadata + name = "flux-cert-generator" + keygen.hostname = "flux-sample-0" +curve + public-key = "WCU!!@2t4>.:Khqg%bNFN#.lf*Eh)vlbVx^@s-is" + secret-key = "NXvRbbIU9KZ?&64Y#)09@zxSw20VT.FfH(J/sJ1-" + +🌀 flux start -o --config /etc/flux/config -Scron.directory=/etc/flux/system/cron.d -Stbon.fanout=256 -Srundir=/run/flux -Sstatedir=/var/lib/flux -Slocal-uri=local:///run/flux/local -Slog-stderr-level=6 -Slog-stderr-mode=local +broker.info[2]: start: none->join 0.434417ms +broker.info[2]: parent-ready: join->init 0.442597s +broker.info[2]: configuration updated +broker.info[2]: rc1.0: running /etc/flux/rc1.d/01-sched-fluxion +broker.info[2]: rc1.0: running /etc/flux/rc1.d/02-cron +broker.info[2]: rc1.0: /etc/flux/rc1 Exited (rc=0) 0.1s +broker.info[2]: rc1-success: init->quorum 0.12074s +broker.info[2]: quorum-full: quorum->run 0.201772s ``` \ No newline at end of file diff --git a/examples/dist/flux-operator.yaml b/examples/dist/flux-operator.yaml index fe82f2d9..1915c20f 100644 --- a/examples/dist/flux-operator.yaml +++ b/examples/dist/flux-operator.yaml @@ -1047,6 +1047,32 @@ rules: - patch - update - watch +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets/finalizers + verbs: + - update +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets/status + verbs: + - get + - patch + - update - apiGroups: - networking.k8s.io resources: diff --git a/examples/tests/jobset/minicluster.yaml b/examples/tests/jobset/minicluster.yaml index df185359..1b6a8686 100644 --- a/examples/tests/jobset/minicluster.yaml +++ b/examples/tests/jobset/minicluster.yaml @@ -4,16 +4,10 @@ metadata: name: flux-sample namespace: flux-operator spec: - # suppress all output except for test run - logging: - quiet: true - # Number of pods to create for MiniCluster size: 4 # This is a list because a pod can support multiple containers containers: - image: ghcr.io/flux-framework/flux-restful-api:latest - command: echo hello world - commands: - post: sleep infinity + command: sleep infinity \ No newline at end of file diff --git a/main.go b/main.go index a0bb2fe1..2b8999fd 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,8 @@ import ( api "flux-framework/flux-operator/api/v1alpha1" + jobset "sigs.k8s.io/jobset/api/v1alpha1" + "flux-framework/flux-operator/controllers/core" //+kubebuilder:scaffold:imports ) @@ -44,6 +46,8 @@ func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(api.AddToScheme(scheme)) + utilruntime.Must(jobset.AddToScheme(scheme)) + //+kubebuilder:scaffold:scheme }