Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

add job suspend run Policy #193

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/apis/common/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type ControllerInterface interface {
// It will requeue the job in case of an error while creating/deleting pods.
// Common implementation will be provided and User can still override this to implement their own reconcile logic
ReconcilePods(job interface{}, jobStatus *JobStatus, pods []*v1.Pod, rtype ReplicaType, spec *ReplicaSpec,
replicas map[ReplicaType]*ReplicaSpec) error
replicas map[ReplicaType]*ReplicaSpec, runPolicy *RunPolicy) error

// ReconcileServices checks and updates services for each given ReplicaSpec.
// It will requeue the job in case of an error while creating/deleting services.
Expand Down
9 changes: 8 additions & 1 deletion pkg/apis/common/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions pkg/apis/common/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ const (
// reached phase failed with no restarting.
// The training has failed its execution.
JobFailed JobConditionType = "Failed"

// JobSuspended means sub-resources (e.g. services/pods) of this job
// has been terminated.
JobSuspended JobConditionType = "Suspended"

// JobResumed means job Resumed from suspended
JobResumed JobConditionType = "Resumed"

// JobPartialSucceed means all sub-resources (e.g. services/pods) of this job's one worker
// reached phase have terminated in success.
JobPartialSucceeded JobConditionType = "PartialSucceeded"
)

// +k8s:openapi-gen=true
Expand Down Expand Up @@ -196,6 +207,17 @@ type RunPolicy struct {
// SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
// +optional
SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"`

// Suspend specifies whether the Job controller should create Pods or not. If
// a Job is created with suspend set to true, no Pods are created by the Job
// controller. If a Job is suspended after creation (i.e. the flag goes from
// false to true), the Job controller will delete all active Pods associated
// with this Job. Users must design their workload to gracefully handle this.
// Suspending a Job will reset the StartTime field of the Job, effectively
// resetting the ActiveDeadlineSeconds timer too.
// Defaults to false.
// +optional
Suspend *bool `json:"suspend,omitempty"`
}

// +k8s:openapi-gen=true
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/common/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/common/v1/zz_generated.defaults.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (jc *JobController) ReconcileJobs(

// Diff current active pods/services with replicas.
for rtype, spec := range replicas {
err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas)
err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas, runPolicy)
if err != nil {
log.Warnf("ReconcilePods error %v", err)
return err
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ func (jc *JobController) ReconcilePods(
pods []*v1.Pod,
rType apiv1.ReplicaType,
spec *apiv1.ReplicaSpec,
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec,
runPolicy *apiv1.RunPolicy) error {

rt := strings.ToLower(string(rType))
metaObject, ok := job.(metav1.Object)
Expand Down Expand Up @@ -317,6 +318,10 @@ func (jc *JobController) ReconcilePods(
} else if len(podSlice) == 0 {
logger.Infof("Need to create new pod: %s-%d", rt, index)

if JobSuspended(runPolicy) || commonutil.IsSuspended(*jobStatus) {
logger.Warningf("job is Suspended %s/%s", metaObject.GetNamespace(), metaObject.GetName())
continue
}
// check if this replica is the master role
masterRole = jc.Controller.IsMasterRole(replicas, rType, index)
err = jc.createNewPod(job, rt, index, spec, masterRole, replicas)
Expand All @@ -328,7 +333,7 @@ func (jc *JobController) ReconcilePods(
pod := podSlice[0]

// check if the index is in the valid range, if not, we should kill the pod
if index < 0 || index >= numReplicas {
if index < 0 || index >= numReplicas || JobSuspended(runPolicy) {
err = jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject)
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller.v1/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,9 @@ func CalcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.R

return &minAvailableTasksRes
}

// JobSuspended returns whether a Job is suspended while taking the feature
// gate into account.
func JobSuspended(runPolicy *apiv1.RunPolicy) bool {
return runPolicy.Suspend != nil && *runPolicy.Suspend
}
2 changes: 1 addition & 1 deletion pkg/core/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func RecordAbnormalPods(activePods []*v1.Pod, object runtime.Object, recorder re

// PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool {
if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil {
if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil || (runPolicy.Suspend != nil && *runPolicy.Suspend) {
return false
}
now := metav1.Now()
Expand Down
36 changes: 32 additions & 4 deletions pkg/util/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ const (
JobFailedReason = "JobFailed"
// JobRestarting is added in a job when it is restarting.
JobRestartingReason = "JobRestarting"

// JobSuspend is added in a job when it is been suspended.
JobSuspendedReason = "JobSuspended"
// JobResumed is added in when job Resumed.
JobResumedReason = "JobResumed"
// JobPartialSucceed is added in when job partial successed.
JobPartialSucceededReason = "JobPartialSucceeded"
// labels for pods and servers.

)
Expand All @@ -32,6 +37,16 @@ func IsFailed(status apiv1.JobStatus) bool {
return hasCondition(status, apiv1.JobFailed)
}

// IsSuspended checks if the job is suspended
func IsSuspended(status apiv1.JobStatus) bool {
return hasCondition(status, apiv1.JobSuspended)
}

// IsSuspended checks if the job is suspended
func IsPartialSucceeded(status apiv1.JobStatus) bool {
return hasCondition(status, apiv1.JobPartialSucceeded)
}

// UpdateJobConditions adds to the jobStatus a new condition if needed, with the conditionType, reason, and message
func UpdateJobConditions(jobStatus *apiv1.JobStatus, conditionType apiv1.JobConditionType, reason, message string) error {
condition := newCondition(conditionType, reason, message)
Expand Down Expand Up @@ -103,16 +118,29 @@ func filterOutCondition(conditions []apiv1.JobCondition, condType apiv1.JobCondi
if condType == apiv1.JobRestarting && c.Type == apiv1.JobRunning {
continue
}
if condType == apiv1.JobRunning && c.Type == apiv1.JobRestarting {
if condType == apiv1.JobRunning && (c.Type == apiv1.JobRestarting || c.Type == apiv1.JobResumed) {
continue
}

if condType == apiv1.JobSuspended && c.Type == apiv1.JobResumed {
continue
}

if condType == apiv1.JobResumed && c.Type == apiv1.JobSuspended {
continue
}

if condType == apiv1.JobRestarting && c.Type == apiv1.JobPartialSucceeded {
continue
}

if c.Type == condType {
continue
}

// Set the running condition status to be false when current condition failed or succeeded
if (condType == apiv1.JobFailed || condType == apiv1.JobSucceeded) && c.Type == apiv1.JobRunning {
// Set the running condition status to be false when current condition failed, succeeded or suspended
if (condType == apiv1.JobFailed || condType == apiv1.JobSucceeded ||
condType == apiv1.JobSuspended) && c.Type == apiv1.JobRunning {
c.Status = v1.ConditionFalse
}

Expand Down