Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 6229 flytek8s #6260

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,18 +535,9 @@ func mergeCustomPodSpec(primaryContainer *v1.Container, podSpec *v1.PodSpec, k8s
"Unable to unmarshal pod spec [%v], Err: [%v]", k8sPod.GetPodSpec(), err.Error())
}

for _, container := range customPodSpec.Containers {
if container.Name != primaryContainer.Name { // Only support the primary container for now
continue
}

if len(container.Resources.Requests) > 0 || len(container.Resources.Limits) > 0 {
primaryContainer.Resources = container.Resources
}
}

if customPodSpec.RuntimeClassName != nil {
podSpec.RuntimeClassName = customPodSpec.RuntimeClassName
podSpec, err = flytek8s.MergePodSpecs(podSpec, customPodSpec, primaryContainer.Name, "")
if err != nil {
return nil, err
}

return podSpec, nil
Expand Down
260 changes: 196 additions & 64 deletions flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,13 @@ func TestBuildResourceRayExtendedResources(t *testing.T) {
}
}

type rayPodAssertions struct {
resources *corev1.ResourceRequirements
runtimeClassName *string
tolerations []corev1.Toleration
affinity *corev1.Affinity
}

func TestBuildResourceRayCustomK8SPod(t *testing.T) {
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{}))

Expand All @@ -457,76 +464,197 @@ func TestBuildResourceRayCustomK8SPod(t *testing.T) {

nvidiaRuntimeClassName := "nvidia-cdi"

headPodSpecCustomResources := &corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-head",
Resources: *expectedHeadResources,
},
headTolerations := []corev1.Toleration{
{
Key: "head",
Operator: corev1.TolerationOpEqual,
Value: "true",
Effect: corev1.TaintEffectNoSchedule,
},
}
workerPodSpecCustomResources := &corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-worker",
Resources: *expectedWorkerResources,
},

workerTolerations := []corev1.Toleration{
{
Key: "worker",
Operator: corev1.TolerationOpEqual,
Value: "true",
Effect: corev1.TaintEffectNoSchedule,
},
}

headPodSpecCustomRuntimeClass := &corev1.PodSpec{
RuntimeClassName: &nvidiaRuntimeClassName,
headAffinity := &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node-type",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"head-node"},
},
},
},
},
},
},
}
workerPodSpecCustomRuntimeClass := &corev1.PodSpec{
RuntimeClassName: &nvidiaRuntimeClassName,

workerAffinity := &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node-type",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"worker-node"},
},
},
},
},
},
},
}

params := []struct {
name string
taskResources *corev1.ResourceRequirements
headK8SPod *core.K8SPod
workerK8SPod *core.K8SPod
expectedSubmitterResources *corev1.ResourceRequirements
expectedHeadResources *corev1.ResourceRequirements
expectedWorkerResources *corev1.ResourceRequirements
expectedSubmitterRuntimeClassName *string
expectedHeadRuntimeClassName *string
expectedWorkerRuntimeClassName *string
name string
headK8SPod *core.K8SPod
workerK8SPod *core.K8SPod
headPodAssertions rayPodAssertions
workerPodAssertions rayPodAssertions
}{
{
name: "task resources",
taskResources: resourceRequirements,
expectedSubmitterResources: resourceRequirements,
expectedHeadResources: resourceRequirements,
expectedWorkerResources: resourceRequirements,
name: "no customizations",
headK8SPod: &core.K8SPod{},
workerK8SPod: &core.K8SPod{},
headPodAssertions: rayPodAssertions{
affinity: &corev1.Affinity{},
resources: resourceRequirements,
},
workerPodAssertions: rayPodAssertions{
affinity: &corev1.Affinity{},
resources: resourceRequirements,
},
},
{
name: "custom worker and head resources",
headK8SPod: &core.K8SPod{
PodSpec: transformStructToStructPB(t, &corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-head",
Resources: *expectedHeadResources,
},
},
}),
},
workerK8SPod: &core.K8SPod{
PodSpec: transformStructToStructPB(t, &corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-worker",
Resources: *expectedWorkerResources,
},
},
}),
},
headPodAssertions: rayPodAssertions{
affinity: &corev1.Affinity{},
resources: expectedHeadResources,
},
workerPodAssertions: rayPodAssertions{
affinity: &corev1.Affinity{},
resources: expectedWorkerResources,
},
},
{
name: "custom runtime class name",
headK8SPod: &core.K8SPod{
PodSpec: transformStructToStructPB(t, &corev1.PodSpec{
RuntimeClassName: &nvidiaRuntimeClassName,
Containers: []corev1.Container{
{
Name: "ray-head",
},
},
}),
},
workerK8SPod: &core.K8SPod{
PodSpec: transformStructToStructPB(t, &corev1.PodSpec{
RuntimeClassName: &nvidiaRuntimeClassName,
Containers: []corev1.Container{
{
Name: "ray-worker",
},
},
}),
},
headPodAssertions: rayPodAssertions{
affinity: &corev1.Affinity{},
resources: resourceRequirements,
runtimeClassName: &nvidiaRuntimeClassName,
},
workerPodAssertions: rayPodAssertions{
affinity: &corev1.Affinity{},
resources: resourceRequirements,
runtimeClassName: &nvidiaRuntimeClassName,
},
},
{
name: "custom worker and head resources",
taskResources: resourceRequirements,
name: "custom tolerations",
headK8SPod: &core.K8SPod{
PodSpec: transformStructToStructPB(t, headPodSpecCustomResources),
PodSpec: transformStructToStructPB(t, &corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-head",
},
},
Tolerations: headTolerations,
}),
},
workerK8SPod: &core.K8SPod{
PodSpec: transformStructToStructPB(t, workerPodSpecCustomResources),
PodSpec: transformStructToStructPB(t, &corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-worker",
},
},
Tolerations: workerTolerations,
}),
},
headPodAssertions: rayPodAssertions{
affinity: &corev1.Affinity{},
resources: resourceRequirements,
tolerations: headTolerations,
},
workerPodAssertions: rayPodAssertions{
affinity: &corev1.Affinity{},
resources: resourceRequirements,
tolerations: workerTolerations,
},
expectedSubmitterResources: resourceRequirements,
expectedHeadResources: expectedHeadResources,
expectedWorkerResources: expectedWorkerResources,
},
{
name: "custom runtime class name",
taskResources: resourceRequirements,
expectedSubmitterResources: resourceRequirements,
expectedHeadResources: resourceRequirements,
expectedWorkerResources: resourceRequirements,
name: "custom affinity",
headK8SPod: &core.K8SPod{
PodSpec: transformStructToStructPB(t, headPodSpecCustomRuntimeClass),
PodSpec: transformStructToStructPB(t, &corev1.PodSpec{
Affinity: headAffinity,
}),
},
workerK8SPod: &core.K8SPod{
PodSpec: transformStructToStructPB(t, workerPodSpecCustomRuntimeClass),
PodSpec: transformStructToStructPB(t, &corev1.PodSpec{
Affinity: workerAffinity,
}),
},
headPodAssertions: rayPodAssertions{
affinity: headAffinity,
resources: resourceRequirements,
},
workerPodAssertions: rayPodAssertions{
affinity: workerAffinity,
resources: resourceRequirements,
},
expectedHeadRuntimeClassName: &nvidiaRuntimeClassName,
expectedWorkerRuntimeClassName: &nvidiaRuntimeClassName,
},
}

Expand All @@ -545,37 +673,41 @@ func TestBuildResourceRayCustomK8SPod(t *testing.T) {
}

taskTemplate := dummyRayTaskTemplate("ray-id", rayJobInput)
taskContext := dummyRayTaskContext(taskTemplate, p.taskResources, nil, "", serviceAccount)
taskContext := dummyRayTaskContext(taskTemplate, resourceRequirements, nil, "", serviceAccount)
rayJobResourceHandler := rayJobResourceHandler{}
r, err := rayJobResourceHandler.BuildResource(context.TODO(), taskContext)
assert.Nil(t, err)
assert.NotNil(t, r)
rayJob, ok := r.(*rayv1.RayJob)
assert.True(t, ok)

submitterPodResources := rayJob.Spec.SubmitterPodTemplate.Spec.Containers[0].Resources
assert.EqualValues(t,
p.expectedSubmitterResources,
&submitterPodResources,
)

headPodSpec := rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec
headPodResources := headPodSpec.Containers[0].Resources
assert.EqualValues(t,
p.expectedHeadResources,
&headPodResources,
)
if p.headPodAssertions.resources != nil {
assert.EqualValues(t,
*p.headPodAssertions.resources,
headPodResources,
)
}

assert.EqualValues(t, p.expectedHeadRuntimeClassName, headPodSpec.RuntimeClassName)
assert.EqualValues(t, p.headPodAssertions.runtimeClassName, headPodSpec.RuntimeClassName)
assert.EqualValues(t, p.headPodAssertions.tolerations, headPodSpec.Tolerations)
assert.EqualValues(t, p.headPodAssertions.affinity, headPodSpec.Affinity)

for _, workerGroupSpec := range rayJob.Spec.RayClusterSpec.WorkerGroupSpecs {
workerPodSpec := workerGroupSpec.Template.Spec
workerPodResources := workerPodSpec.Containers[0].Resources
assert.EqualValues(t,
p.expectedWorkerResources,
&workerPodResources,
)
assert.EqualValues(t, p.expectedWorkerRuntimeClassName, workerPodSpec.RuntimeClassName)

if p.workerPodAssertions.resources != nil {
assert.EqualValues(t,
*p.workerPodAssertions.resources,
workerPodResources,
)
}

assert.EqualValues(t, p.workerPodAssertions.runtimeClassName, workerPodSpec.RuntimeClassName)
assert.EqualValues(t, p.workerPodAssertions.tolerations, workerPodSpec.Tolerations)
assert.EqualValues(t, p.workerPodAssertions.affinity, workerPodSpec.Affinity)
}
})
}
Expand Down
Loading