Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run worker process in launcher pod #612

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions deploy/v2beta1/mpi-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8190,6 +8190,11 @@ spec:
description: MPIReplicaSpecs contains maps from `MPIReplicaType` to
`ReplicaSpec` that specify the MPI replicas to run.
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicates whether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
description: RunPolicy encapsulates various runtime policies of the
job.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
k8s.io/code-generator v0.27.4
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f
k8s.io/utils v0.0.0-20230209194617-a36077c30491
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
sigs.k8s.io/controller-runtime v0.15.1
sigs.k8s.io/scheduler-plugins v0.26.7
sigs.k8s.io/structured-merge-diff/v4 v4.2.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg=
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg=
k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY=
k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2 h1:trsWhjU5jZrx6UvFu4WzQDrN7Pga4a7Qg+zcfcj64PA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.2/go.mod h1:+qG7ISXqCDVVcyO8hLn12AKVYYUjM7ftlqsqmrhMZE0=
sigs.k8s.io/controller-runtime v0.15.1 h1:9UvgKD4ZJGcj24vefUFgZFP3xej/3igL9BsOUTb/+4c=
Expand Down
5 changes: 5 additions & 0 deletions manifests/base/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8167,6 +8167,11 @@ spec:
description: MPIReplicaSpecs contains maps from `MPIReplicaType` to
`ReplicaSpec` that specify the MPI replicas to run.
type: object
runLauncherAsWorker:
default: false
description: RunLauncherAsWorker indicates whether to run worker process
in launcher Defaults to false.
type: boolean
runPolicy:
description: RunPolicy encapsulates various runtime policies of the
job.
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
"$ref": "#/definitions/v2beta1.ReplicaSpec"
}
},
"runLauncherAsWorker": {
"description": "RunLauncherAsWorker indicates whether to run worker process in launcher Defaults to false.",
"type": "boolean"
},
"runPolicy": {
"description": "RunPolicy encapsulates various runtime policies of the job.",
"default": {},
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ type MPIJobSpec struct {
// +kubebuilder:default:=1
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

// RunLauncherAsWorker indicates whether to run worker process in launcher
// Defaults to false.
// +optional
// +kubebuilder:default:=false
RunLauncherAsWorker *bool `json:"runLauncherAsWorker,omitempty"`

// RunPolicy encapsulates various runtime policies of the job.
RunPolicy RunPolicy `json:"runPolicy,omitempty"`

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/client/applyconfiguration/kubeflow/v2beta1/mpijobspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 31 additions & 8 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"

Check failure on line 56 in pkg/controller/mpi_job_controller.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

SA1019: "k8s.io/utils/pointer" is deprecated: Use functions in k8s.io/utils/ptr instead: ptr.To to obtain a pointer, ptr.Deref to dereference a pointer, ptr.Equal to compare dereferenced pointers. (staticcheck)
"k8s.io/utils/ptr"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"

Expand Down Expand Up @@ -656,13 +657,14 @@
return err
}
}
if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel ||
// If we want to run process in launcher, we should create a service for launcher.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't create an additional service in the case of OpenMPI. Just allowing the existing Service to match the launcher pod should be enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing Service created with selector training.kubeflow.org/job-role: worker which can't be used directly. Without creating Service for launcher, we should change the service for worker and remove the selector.
Create a service is more preferable for me. WDYT @tenzen-y @alculquicondor

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more efficient to just change the selector.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alculquicondor Do you mean change the selector in the case of OpenMPI or in all the case ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just change it for all

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Although I'm not sure if IntelMPI would work with just one Service, as it's very picky about the hostname. Worth trying.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK,I will try it. While we just make the service refactor in this PR or with another one, since it's already a way to run with respect to the original service design?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here should be fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

Copy link
Member Author

@kuizhiqing kuizhiqing Feb 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alculquicondor @tenzen-y I've add a service for both launcher and workers, for IntelMPI which need to access launcher with hostname, I modify the searches part in DNSConfig.

// The Intel and MPICH implementations require workers to communicate with the
// launcher through its hostname. For that, we create a Service which
// has the same name as the launcher's hostname.
if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) ||
mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel ||
mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationMPICH {
// The Intel and MPICH implementations require workers to communicate with the
// launcher through its hostname. For that, we create a Service which
// has the same name as the launcher's hostname.
_, err := c.getOrCreateService(mpiJob, newLauncherService(mpiJob))
if err != nil {
if _, err = c.getOrCreateService(mpiJob, newLauncherService(mpiJob)); err != nil {
return fmt.Errorf("getting or creating Service to front launcher: %w", err)
}
}
Expand Down Expand Up @@ -1284,12 +1286,26 @@
if mpiJob.Spec.SlotsPerWorker != nil {
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
// note that pod.spec.dnsConfig also affect the svc resolution
// ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
// launcher can be reach with hostname or service name
if mpiJob.Spec.RunLauncherAsWorker != nil && *mpiJob.Spec.RunLauncherAsWorker {
launcherService := mpiJob.Name + launcherSuffix
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
buffer.WriteString(fmt.Sprintf("%s.%s.svc slots=%d\n", launcherService, mpiJob.Namespace, slots))
case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
buffer.WriteString(fmt.Sprintf("%s.%s.svc:%d\n", launcherService, mpiJob.Namespace, slots))
}
}

for i := 0; i < int(workerReplicas); i++ {
name := workerName(mpiJob, i)
switch mpiJob.Spec.MPIImplementation {
case kubeflow.MPIImplementationOpenMPI:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc slots=%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc slots=%d\n", name, workersService, mpiJob.Namespace, slots))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice refactorings :)

case kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH:
buffer.WriteString(fmt.Sprintf("%s%s-%d.%s.%s.svc:%d\n", mpiJob.Name, workerSuffix, i, workersService, mpiJob.Namespace, slots))
buffer.WriteString(fmt.Sprintf("%s.%s.%s.svc:%d\n", name, workersService, mpiJob.Namespace, slots))
}
}

Expand Down Expand Up @@ -1319,6 +1335,13 @@

var buffer bytes.Buffer
buffer.WriteString("#!/bin/sh\n")

// We don't check if launcher is running here, launcher should always be there or the job failed
if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) {
launcherService := mpiJob.Name + launcherSuffix
buffer.WriteString(fmt.Sprintf("echo %s.%s.svc\n", launcherService, mpiJob.Namespace))
}

workersService := mpiJob.Name + workerSuffix
for _, p := range runningPods {
buffer.WriteString(fmt.Sprintf("echo %s.%s.%s.svc\n", p.Name, workersService, p.Namespace))
Expand Down
88 changes: 72 additions & 16 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
clocktesting "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"

Check failure on line 40 in pkg/controller/mpi_job_controller_test.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

SA1019: "k8s.io/utils/pointer" is deprecated: Use functions in k8s.io/utils/ptr instead: ptr.To to obtain a pointer, ptr.Deref to dereference a pointer, ptr.Equal to compare dereferenced pointers. (staticcheck)
"k8s.io/utils/ptr"
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
Expand Down Expand Up @@ -110,18 +111,6 @@
CleanPodPolicy: &cleanPodPolicyAll,
},
MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{
kubeflow.MPIReplicaTypeWorker: {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "foo",
Image: "bar",
},
},
},
},
},
kubeflow.MPIReplicaTypeLauncher: {
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -151,7 +140,22 @@

func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob {
mpiJob := newMPIJobCommon(name, startTime, completionTime)
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Replicas = replicas
if ptr.Deref(replicas, 0) > 0 {
mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] =
&kubeflow.ReplicaSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "foo",
Image: "bar",
},
},
},
},
Replicas: replicas,
}
}
return mpiJob
}

Expand Down Expand Up @@ -526,7 +530,8 @@
for i := 0; i < 5; i++ {
f.expectCreatePodAction(fmjc.newWorker(mpiJobCopy, i))
}
if implementation == kubeflow.MPIImplementationIntel ||
if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) ||
implementation == kubeflow.MPIImplementationIntel ||
implementation == kubeflow.MPIImplementationMPICH {
f.expectCreateServiceAction(newLauncherService(mpiJobCopy))
}
Expand Down Expand Up @@ -822,7 +827,8 @@
t.Fatalf("Failed creating secret")
}
f.expectCreateSecretAction(secret)
if implementation == kubeflow.MPIImplementationIntel ||
if ptr.Deref(mpiJob.Spec.RunLauncherAsWorker, false) ||
implementation == kubeflow.MPIImplementationIntel ||
implementation == kubeflow.MPIImplementationMPICH {
f.expectCreateServiceAction(newLauncherService(mpiJob))
}
Expand Down Expand Up @@ -1538,7 +1544,57 @@
workerReplicas int32
wantCM *corev1.ConfigMap
}{
"OpenMPI without slots": {
"OpenMPI without slots, enable launcher as worker": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots",
Namespace: "tenant-a",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: pointer.Bool(true),
},
},
workerReplicas: 2,
wantCM: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots-config",
Namespace: "tenant-a",
Labels: map[string]string{
"app": "openmpi-without-slots",
},
},
Data: map[string]string{
"hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\nopenmpi-without-slots-worker-0.openmpi-without-slots-worker.tenant-a.svc slots=1\nopenmpi-without-slots-worker-1.openmpi-without-slots-worker.tenant-a.svc slots=1\n",
},
},
},
"OpenMPI without slots, zero explicit workers": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots",
Namespace: "tenant-a",
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: pointer.Bool(true),
},
},
workerReplicas: 0,
wantCM: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots-config",
Namespace: "tenant-a",
Labels: map[string]string{
"app": "openmpi-without-slots",
},
},
Data: map[string]string{
"hostfile": "openmpi-without-slots-launcher.tenant-a.svc slots=1\n",
},
},
},
"OpenMPI without slots, disable launcher as worker": {
mpiJob: &kubeflow.MPIJob{
ObjectMeta: metav1.ObjectMeta{
Name: "openmpi-without-slots",
Expand Down
1 change: 1 addition & 0 deletions sdk/python/v2beta1/docs/V2beta1MPIJobSpec.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 29 additions & 1 deletion sdk/python/v2beta1/mpijob/models/v2beta1_mpi_job_spec.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading