Skip to content

Commit

Permalink
Support for Image pull policy (#2101)
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky authored Apr 29, 2024
1 parent bafb009 commit f27e4ac
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 163 deletions.
7 changes: 6 additions & 1 deletion apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ func PopulateHeadNodeSpec(spec rayv1api.HeadGroupSpec) *api.HeadGroupSpec {
if len(spec.Template.Spec.ImagePullSecrets) > 0 {
headNodeSpec.ImagePullSecret = spec.Template.Spec.ImagePullSecrets[0].Name
}
if spec.Template.Spec.Containers[0].ImagePullPolicy == corev1.PullAlways {
headNodeSpec.ImagePullPolicy = "Always"
}

return headNodeSpec
}
Expand Down Expand Up @@ -299,7 +302,9 @@ func PopulateWorkerNodeSpec(specs []rayv1api.WorkerGroupSpec) []*api.WorkerGroup
if len(spec.Template.Spec.ImagePullSecrets) > 0 {
workerNodeSpec.ImagePullSecret = spec.Template.Spec.ImagePullSecrets[0].Name
}

if spec.Template.Spec.Containers[0].ImagePullPolicy == corev1.PullAlways {
workerNodeSpec.ImagePullPolicy = "Always"
}
workerNodeSpecs = append(workerNodeSpecs, workerNodeSpec)
}

Expand Down
8 changes: 6 additions & 2 deletions apiserver/pkg/model/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ var headSpecTest = rayv1api.HeadGroupSpec{
},
Containers: []corev1.Container{
{
Name: "ray-head",
Image: "blublinsky1/ray310:2.5.0",
Name: "ray-head",
Image: "blublinsky1/ray310:2.5.0",
ImagePullPolicy: "Always",
Env: []corev1.EnvVar{
{
Name: "AWS_KEY",
Expand Down Expand Up @@ -471,6 +472,9 @@ func TestPopulateHeadNodeSpec(t *testing.T) {
if groupSpec.ImagePullSecret != "foo" {
t.Errorf("failed to convert image pull secret")
}
if groupSpec.ImagePullPolicy != "Always" {
t.Errorf("failed to convert image pull policy")
}
if !reflect.DeepEqual(groupSpec.Annotations, expectedAnnotations) {
t.Errorf("failed to convert annotations, got %v, expected %v", groupSpec.Annotations, expectedAnnotations)
}
Expand Down
7 changes: 7 additions & 0 deletions apiserver/pkg/server/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func ValidateClusterSpec(clusterSpec *api.ClusterSpec) error {
if len(clusterSpec.HeadGroupSpec.RayStartParams) == 0 {
return util.NewInvalidInputError("HeadGroupSpec RayStartParams is empty. Please specify values.")
}
if len(clusterSpec.HeadGroupSpec.ImagePullPolicy) > 0 &&
clusterSpec.HeadGroupSpec.ImagePullPolicy != "Always" && clusterSpec.HeadGroupSpec.ImagePullPolicy != "IfNotPresent" {
return util.NewInvalidInputError("HeadGroupSpec unsupported value for Image pull policy. Please specify Always or IfNotPresent")
}

for index, spec := range clusterSpec.WorkerGroupSpec {
if len(spec.GroupName) == 0 {
Expand All @@ -34,6 +38,9 @@ func ValidateClusterSpec(clusterSpec *api.ClusterSpec) error {
if spec.MinReplicas > spec.MaxReplicas {
return util.NewInvalidInputError("WorkerNodeSpec %d MinReplica > MaxReplicas. Please specify a valid value.", index)
}
if len(spec.ImagePullPolicy) > 0 && spec.ImagePullPolicy != "Always" && spec.ImagePullPolicy != "IfNotPresent" {
return util.NewInvalidInputError("Worker GroupSpec unsupported value for Image pull policy. Please specify Always or IfNotPresent")
}
}
return nil
}
15 changes: 15 additions & 0 deletions apiserver/pkg/server/validations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ func TestValidateClusterSpec(t *testing.T) {
},
expectedError: util.NewInvalidInputError("HeadGroupSpec RayStartParams is empty. Please specify values."),
},
{
name: "A head group with A wrong image pull policy",
clusterSpec: &api.ClusterSpec{
HeadGroupSpec: &api.HeadGroupSpec{
ComputeTemplate: "a template",
RayStartParams: map[string]string{
"dashboard-host": "0.0.0.0",
"metrics-export-port": "8080",
},
ImagePullPolicy: "foo",
},
WorkerGroupSpec: []*api.WorkerGroupSpec{},
},
expectedError: util.NewInvalidInputError("HeadGroupSpec unsupported value for Image pull policy. Please specify Always or IfNotPresent"),
},
{
name: "An empty worker group",
clusterSpec: &api.ClusterSpec{
Expand Down
31 changes: 25 additions & 6 deletions apiserver/pkg/util/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net"
"strconv"
"strings"

klog "k8s.io/klog/v2"

Expand Down Expand Up @@ -150,6 +151,13 @@ func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, s
image = spec.Image
}

// Image pull policy. Kubernetes default image pull policy IfNotPresent, so we here only
// Overwrite it if it is Always
imagePullPolicy := corev1.PullIfNotPresent
if len(spec.ImagePullPolicy) > 0 && strings.ToLower(spec.ImagePullPolicy) == "always" {
imagePullPolicy = corev1.PullAlways
}

// calculate resources
cpu := fmt.Sprint(computeRuntime.GetCpu())
memory := fmt.Sprintf("%d%s", computeRuntime.GetMemory(), "Gi")
Expand All @@ -170,8 +178,9 @@ func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, s
Tolerations: []corev1.Toleration{},
Containers: []corev1.Container{
{
Name: "ray-head",
Image: image,
Name: "ray-head",
Image: image,
ImagePullPolicy: imagePullPolicy,
Env: []corev1.EnvVar{
{
Name: "MY_POD_IP",
Expand Down Expand Up @@ -392,6 +401,13 @@ func buildWorkerPodTemplate(imageVersion string, envs *api.EnvironmentVariables,
image = spec.Image
}

// Image pull policy. Kubernetes default image pull policy IfNotPresent, so we here only
// Overwrite it if it is Always
imagePullPolicy := corev1.PullIfNotPresent
if len(spec.ImagePullPolicy) > 0 && strings.ToLower(spec.ImagePullPolicy) == "always" {
imagePullPolicy = corev1.PullAlways
}

// calculate resources
cpu := fmt.Sprint(computeRuntime.GetCpu())
memory := fmt.Sprintf("%d%s", computeRuntime.GetMemory(), "Gi")
Expand All @@ -412,8 +428,9 @@ func buildWorkerPodTemplate(imageVersion string, envs *api.EnvironmentVariables,
Tolerations: []corev1.Toleration{},
Containers: []corev1.Container{
{
Name: "ray-worker",
Image: image,
Name: "ray-worker",
Image: image,
ImagePullPolicy: imagePullPolicy,
Env: []corev1.EnvVar{
{
Name: "RAY_DISABLE_DOCKER_CPU_WARNING",
Expand Down Expand Up @@ -858,9 +875,11 @@ func buildAutoscalerOptions(autoscalerOptions *api.AutoscalerOptions) (*rayv1api
if len(autoscalerOptions.Image) > 0 {
options.Image = &autoscalerOptions.Image
}
if len(autoscalerOptions.ImagePullPolicy) > 0 {
options.ImagePullPolicy = (*corev1.PullPolicy)(&autoscalerOptions.ImagePullPolicy)
if len(autoscalerOptions.ImagePullPolicy) > 0 && strings.ToLower(autoscalerOptions.ImagePullPolicy) == "always" {
policy := corev1.PullAlways
options.ImagePullPolicy = &policy
}

if autoscalerOptions.Envs != nil {
if len(autoscalerOptions.Envs.Values) > 0 {
options.Env = make([]corev1.EnvVar, len(autoscalerOptions.Envs.Values))
Expand Down
9 changes: 9 additions & 0 deletions apiserver/pkg/util/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ var testAutoscalerOptions = api.AutoscalerOptions{
var headGroup = api.HeadGroupSpec{
ComputeTemplate: "foo",
Image: "bar",
ImagePullPolicy: "Always",
ServiceType: "ClusterIP",
RayStartParams: map[string]string{
"dashboard-host": "0.0.0.0",
Expand Down Expand Up @@ -172,6 +173,7 @@ var workerGroup = api.WorkerGroupSpec{
GroupName: "wg",
ComputeTemplate: "foo",
Image: "bar",
ImagePullPolicy: "Always",
Replicas: 5,
MinReplicas: 5,
MaxReplicas: 5,
Expand Down Expand Up @@ -517,6 +519,9 @@ func TestBuildHeadPodTemplate(t *testing.T) {
if podSpec.Spec.ImagePullSecrets[0].Name != "foo" {
t.Errorf("failed to propagate image pull secret")
}
if (string)(podSpec.Spec.Containers[0].ImagePullPolicy) != "Always" {
t.Errorf("failed to propagate image pull policy")
}
if len(podSpec.Spec.Containers[0].Env) != 6 {
t.Errorf("failed to propagate environment")
}
Expand Down Expand Up @@ -561,6 +566,7 @@ func TestConvertAutoscalerOptions(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, *options.IdleTimeoutSeconds, int32(25))
assert.Equal(t, (string)(*options.UpscalingMode), "Default")
assert.Equal(t, (string)(*options.ImagePullPolicy), "Always")
assert.Equal(t, len(options.Env), 1)
assert.Equal(t, len(options.EnvFrom), 2)
assert.Equal(t, len(options.VolumeMounts), 2)
Expand Down Expand Up @@ -594,6 +600,9 @@ func TestBuilWorkerPodTemplate(t *testing.T) {
if podSpec.Spec.ImagePullSecrets[0].Name != "foo" {
t.Errorf("failed to propagate image pull secret")
}
if (string)(podSpec.Spec.Containers[0].ImagePullPolicy) != "Always" {
t.Errorf("failed to propagate image pull policy")
}
if !containsEnv(podSpec.Spec.Containers[0].Env, "foo", "bar") {
t.Errorf("failed to propagate environment")
}
Expand Down
60 changes: 32 additions & 28 deletions proto/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,18 @@ message EnvironmentVariables {

message AutoscalerOptions {
// IdleTimeoutSeconds is the number of seconds to wait before scaling down a worker pod which is not using Ray resources.
// Defaults to 60 (one minute).
int32 idleTimeoutSeconds = 1;
// UpscalingMode is "Conservative", "Default", or "Aggressive."
// Conservative: Upscaling is rate-limited; the number of pending worker pods is at most the size of the Ray cluster.
// Default: Upscaling is not rate-limited.
// Aggressive: An alias for Default; upscaling is not rate-limited.
// It is not read by the KubeRay operator but by the Ray autoscaler.
string upscalingMode = 2;
// Defaults to 60 (one minute).
int32 idleTimeoutSeconds = 1;
// UpscalingMode is "Conservative", "Default", or "Aggressive."
// Conservative: Upscaling is rate-limited; the number of pending worker pods is at most the size of the Ray cluster.
// Default: Upscaling is not rate-limited.
// Aggressive: An alias for Default; upscaling is not rate-limited.
// It is not read by the KubeRay operator but by the Ray autoscaler.
string upscalingMode = 2;
// Image optionally overrides the autoscaler's container image. This override is for provided for autoscaler testing and development.
string image = 3;
// ImagePullPolicy optionally overrides the autoscaler container's image pull policy. This override is for provided for autoscaler testing and development.
string imagePullPolicy = 4;
string image = 3;
// ImagePullPolicy optionally overrides the autoscaler container's image pull policy. This override is for provided for autoscaler testing and development.
string imagePullPolicy = 4;
// Optional CPUs requirements for autoscaler - default "500m"
string cpu = 5;
// Optional memory requirements for autoscaler - default "512Mi"
Expand Down Expand Up @@ -189,10 +189,10 @@ message Cluster {
PRODUCTION = 3;
}
Environment environment = 5;

// Required field. This field indicates ray cluster configuration
ClusterSpec cluster_spec = 6 [(google.api.field_behavior) = REQUIRED];

// Optional. Annotations, for example, "kubernetes.io/ingress.class" to define Ingress class
map<string, string> annotations = 7;

Expand All @@ -210,7 +210,7 @@ message Cluster {

// Output. The list related to the cluster.
repeated ClusterEvent events = 12 [(google.api.field_behavior) = OUTPUT_ONLY];

// Output. The service endpoint of the cluster
map<string, string> service_endpoint = 13 [(google.api.field_behavior) = OUTPUT_ONLY];
}
Expand All @@ -222,9 +222,9 @@ message ClusterSpec {
// Optional. The worker group configurations
repeated WorkerGroupSpec worker_group_spec = 2;
// EnableInTreeAutoscaling indicates whether operator should create in tree autoscaling configs
bool enableInTreeAutoscaling = 3;
// AutoscalerOptions specifies optional configuration for the Ray autoscaler.
AutoscalerOptions autoscalerOptions = 4;
bool enableInTreeAutoscaling = 3;
// AutoscalerOptions specifies optional configuration for the Ray autoscaler.
AutoscalerOptions autoscalerOptions = 4;

}

Expand All @@ -242,7 +242,7 @@ message Volume {
string name = 3; // volume name
string source = 4; // volume source, for example hostpath source, secret or configMap name, etc
bool read_only = 5; // Read only flag

// If indicate hostpath, we need to let user indicate which type
// they would like to use.
enum HostPathType {
Expand All @@ -256,7 +256,7 @@ message Volume {
HOSTTOCONTAINER = 1;
BIDIRECTIONAL = 2;
}
MountPropagationMode mount_propagation_mode = 7;
MountPropagationMode mount_propagation_mode = 7;
// If indicate ephemeral, we need to let user specify volumeClaimTemplate
string storageClassName = 8; // If not defined, default is used
enum AccessMode {
Expand Down Expand Up @@ -286,15 +286,17 @@ message HeadGroupSpec {
repeated Volume volumes = 6;
// Optional. ServiceAccount used by head pod
// Note that the service account has to be created prior to usage here
string service_account = 7;
string service_account = 7;
// Optional. image pull secret used by head pod
string image_pull_secret = 8;
string image_pull_secret = 8;
// Optional. Environment variables for head pod
EnvironmentVariables environment = 9;
// Optional. Annotations for the head pod
map<string, string> annotations = 10;
// Optional. Labels for the head pod
map<string, string> labels = 11;
// Optional image pull policy We only support Always and ifNotPresent
string imagePullPolicy = 12;
}

message WorkerGroupSpec {
Expand All @@ -304,9 +306,9 @@ message WorkerGroupSpec {
string compute_template = 2 [(google.api.field_behavior) = REQUIRED];
// Optional field. This field will be used to retrieve right ray container
string image = 3;
// Required. Desired replicas of the worker group
// Required. Desired replicas of the worker group
int32 replicas = 4 [(google.api.field_behavior) = REQUIRED];
// Optional. Min replicas of the worker group, can't be greater than max_replicas.
// Optional. Min replicas of the worker group, can't be greater than max_replicas.
int32 min_replicas = 5;
// Required. Max replicas of the worker group (>0)
int32 max_replicas = 6 [(google.api.field_behavior) = REQUIRED];
Expand All @@ -316,15 +318,17 @@ message WorkerGroupSpec {
repeated Volume volumes = 8;
// Optional. ServiceAccount used by worker pod
// Note that the service account has to be created prior to usage here
string service_account = 9;
string service_account = 9;
// Optional. image pull secret used by worker pod
string image_pull_secret = 10;
string image_pull_secret = 10;
// Optional. Environment variables for worker pod
EnvironmentVariables environment = 11;
// Optional. Annotations for the worker pod
map<string, string> annotations = 12;
// Optional. Labels for the worker pod
map<string, string> labels = 13;
// Optional image pull policy We only support Always and ifNotPresent
string imagePullPolicy = 14;
}

message ClusterEvent {
Expand All @@ -334,7 +338,7 @@ message ClusterEvent {
// Human readable name for event.
string name = 2;

// Event creation time.
// Event creation time.
google.protobuf.Timestamp created_at = 3;

// The first time the event occur.
Expand All @@ -351,7 +355,7 @@ message ClusterEvent {

// Type of this event (Normal, Warning), new types could be added in the future
string type = 8;

// The number of times this event has occurred.
int32 count = 9;
}
}
Loading

0 comments on commit f27e4ac

Please sign in to comment.