From 31add861f98fe4c384f3f8d43b125dfe5ef8487f Mon Sep 17 00:00:00 2001 From: kuizhiqing Date: Mon, 19 Feb 2024 12:41:32 +0000 Subject: [PATCH] use ptr.Deref Signed-off-by: kuizhiqing --- go.mod | 2 +- go.sum | 4 ++-- pkg/apis/kubeflow/v2beta1/types.go | 2 +- pkg/controller/mpi_job_controller.go | 6 ++++-- pkg/controller/mpi_job_controller_test.go | 7 ++++--- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 4fb934f2..55361ccd 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( k8s.io/code-generator v0.27.4 k8s.io/klog v1.0.0 k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f - k8s.io/utils v0.0.0-20230209194617-a36077c30491 + k8s.io/utils v0.0.0-20240102154912-e7106e64919e sigs.k8s.io/controller-runtime v0.15.1 sigs.k8s.io/scheduler-plugins v0.26.7 sigs.k8s.io/structured-merge-diff/v4 v4.2.3 diff --git a/go.sum b/go.sum index d0bb6f57..01dede9e 100644 --- a/go.sum +++ b/go.sum @@ -384,8 +384,8 @@ k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg= k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg= -k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY= -k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 h1:trsWhjU5jZrx6UvFu4WzQDrN7Pga4a7Qg+zcfcj64PA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2/go.mod h1:+qG7ISXqCDVVcyO8hLn12AKVYYUjM7ftlqsqmrhMZE0= sigs.k8s.io/controller-runtime v0.15.1 h1:9UvgKD4ZJGcj24vefUFgZFP3xej/3igL9BsOUTb/+4c= diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 65a09858..5480d842 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -154,7 +154,7 @@ type MPIJobSpec struct { // +kubebuilder:default:=1 SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"` - // RunLauncherAsWorker indicates wether to run worker process in launcher + // RunLauncherAsWorker indicates whether to run worker process in launcher // Defaults to false. // +optional // +kubebuilder:default:=false diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index ecc98fa2..d0dcca90 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -54,6 +54,7 @@ import ( "k8s.io/klog" "k8s.io/utils/clock" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" @@ -656,10 +657,11 @@ func (c *MPIJobController) syncHandler(key string) error { return err } } + // If we want to run process in launcher, we should create a service for launcher. // The Intel and MPICH implementations require workers to communicate with the // launcher through its hostname. For that, we create a Service which // has the same name as the launcher's hostname. - if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) || mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel || mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH { if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil { @@ -1335,7 +1337,7 @@ func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflo buffer.WriteString("#!/bin/sh\n") // We don't check if launcher is running here, launcher should always be there or the job failed - if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker { + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) { launcherService := mpiJob.Name + launcherSuffix buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace)) } diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 2186fac4..cc157aa8 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/utils/clock" clocktesting "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -139,7 +140,7 @@ func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubef func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { mpiJob := newMPIJobCommon(name, startTime, completionTime) - if *replicas > 0 { + if ptr.Deref(replicas, 0) > 0 { mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] = &kubeflow.ReplicaSpec{ Template: corev1.PodTemplateSpec{ @@ -529,7 +530,7 @@ func TestAllResourcesCreated(t *testing.T) { for i := 0; i < 5; i++ { f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i)) } - if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) || implementation == kubeflow.MPIImplementationIntel || implementation == kubeflow.MPIImplementationMPICH { f.expectCreateServiceAction(newLauncherService(mpiJobCopy)) @@ -826,7 +827,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) { t.Fatalf("Failed creating secret") } f.expectCreateSecretAction(secret) - if (mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker) || + if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) || implementation == kubeflow.MPIImplementationIntel || implementation == kubeflow.MPIImplementationMPICH { f.expectCreateServiceAction(newLauncherService(mpiJob))