Skip to content
This repository was archived by the owner on Sep 12, 2023. It is now read-only.

Commit cd3a8ee

Browse files
committed
add job suspend run Policy
add job partial success status
1 parent 2b40c8f commit cd3a8ee

File tree

10 files changed

+85
-12
lines changed

10 files changed

+85
-12
lines changed

pkg/apis/common/v1/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type ControllerInterface interface {
6464
// It will requeue the job in case of an error while creating/deleting pods.
6565
// Common implementation will be provided and User can still override this to implement their own reconcile logic
6666
ReconcilePods(job interface{}, jobStatus *JobStatus, pods []*v1.Pod, rtype ReplicaType, spec *ReplicaSpec,
67-
replicas map[ReplicaType]*ReplicaSpec) error
67+
replicas map[ReplicaType]*ReplicaSpec, runPolicy *RunPolicy) error
6868

6969
// ReconcileServices checks and updates services for each given ReplicaSpec.
7070
// It will requeue the job in case of an error while creating/deleting services.

pkg/apis/common/v1/openapi_generated.go

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/common/v1/types.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,17 @@ const (
135135
// reached phase failed with no restarting.
136136
// The training has failed its execution.
137137
JobFailed JobConditionType = "Failed"
138+
139+
// JobSuspended means sub-resources (e.g. services/pods) of this job
140+
// has been terminated.
141+
JobSuspended JobConditionType = "Suspended"
142+
143+
// JobResumed means job Resumed from suspended
144+
JobResumed JobConditionType = "Resumed"
145+
146+
// JobPartialSucceed means all sub-resources (e.g. services/pods) of this job's one worker
147+
// reached phase have terminated in success.
148+
JobPartialSucceeded JobConditionType = "PartialSucceeded"
138149
)
139150

140151
// +k8s:openapi-gen=true
@@ -196,6 +207,17 @@ type RunPolicy struct {
196207
// SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
197208
// +optional
198209
SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"`
210+
211+
// Suspend specifies whether the Job controller should create Pods or not. If
212+
// a Job is created with suspend set to true, no Pods are created by the Job
213+
// controller. If a Job is suspended after creation (i.e. the flag goes from
214+
// false to true), the Job controller will delete all active Pods associated
215+
// with this Job. Users must design their workload to gracefully handle this.
216+
// Suspending a Job will reset the StartTime field of the Job, effectively
217+
// resetting the ActiveDeadlineSeconds timer too.
218+
// Defaults to false.
219+
// +optional
220+
Suspend *bool `json:"suspend,omitempty"`
199221
}
200222

201223
// +k8s:openapi-gen=true

pkg/apis/common/v1/zz_generated.deepcopy.go

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/common/v1/zz_generated.defaults.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/controller.v1/common/job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func (jc *JobController) ReconcileJobs(
270270

271271
// Diff current active pods/services with replicas.
272272
for rtype, spec := range replicas {
273-
err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas)
273+
err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas, runPolicy)
274274
if err != nil {
275275
log.Warnf("ReconcilePods error %v", err)
276276
return err

pkg/controller.v1/common/pod.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,8 @@ func (jc *JobController) ReconcilePods(
274274
pods []*v1.Pod,
275275
rType apiv1.ReplicaType,
276276
spec *apiv1.ReplicaSpec,
277-
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error {
277+
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec,
278+
runPolicy *apiv1.RunPolicy) error {
278279

279280
rt := strings.ToLower(string(rType))
280281
metaObject, ok := job.(metav1.Object)
@@ -317,6 +318,10 @@ func (jc *JobController) ReconcilePods(
317318
} else if len(podSlice) == 0 {
318319
logger.Infof("Need to create new pod: %s-%d", rt, index)
319320

321+
if JobSuspended(runPolicy) || commonutil.IsSuspended(*jobStatus) {
322+
logger.Warningf("job is Suspended %s/%s", metaObject.GetNamespace(), metaObject.GetName())
323+
continue
324+
}
320325
// check if this replica is the master role
321326
masterRole = jc.Controller.IsMasterRole(replicas, rType, index)
322327
err = jc.createNewPod(job, rt, index, spec, masterRole, replicas)
@@ -328,7 +333,7 @@ func (jc *JobController) ReconcilePods(
328333
pod := podSlice[0]
329334

330335
// check if the index is in the valid range, if not, we should kill the pod
331-
if index < 0 || index >= numReplicas {
336+
if index < 0 || index >= numReplicas || JobSuspended(runPolicy) {
332337
err = jc.PodControl.DeletePod(pod.Namespace, pod.Name, runtimeObject)
333338
if err != nil {
334339
return err

pkg/controller.v1/common/util.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,9 @@ func CalcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.R
143143

144144
return &minAvailableTasksRes
145145
}
146+
147+
// JobSuspended returns whether a Job is suspended while taking the feature
148+
// gate into account.
149+
func JobSuspended(runPolicy *apiv1.RunPolicy) bool {
150+
return runPolicy.Suspend != nil && *runPolicy.Suspend
151+
}

pkg/core/job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func RecordAbnormalPods(activePods []*v1.Pod, object runtime.Object, recorder re
6464

6565
// PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
6666
func PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool {
67-
if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil {
67+
if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil || (runPolicy.Suspend != nil && *runPolicy.Suspend) {
6868
return false
6969
}
7070
now := metav1.Now()

pkg/util/status.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@ const (
1717
JobFailedReason = "JobFailed"
1818
// JobRestarting is added in a job when it is restarting.
1919
JobRestartingReason = "JobRestarting"
20-
20+
// JobSuspend is added in a job when it is been suspended.
21+
JobSuspendedReason = "JobSuspended"
22+
// JobResumed is added in when job Resumed.
23+
JobResumedReason = "JobResumed"
24+
// JobPartialSucceed is added in when job partial successed.
25+
JobPartialSucceededReason = "JobPartialSucceeded"
2126
// labels for pods and servers.
2227

2328
)
@@ -32,6 +37,16 @@ func IsFailed(status apiv1.JobStatus) bool {
3237
return hasCondition(status, apiv1.JobFailed)
3338
}
3439

40+
// IsSuspended checks if the job is suspended
41+
func IsSuspended(status apiv1.JobStatus) bool {
42+
return hasCondition(status, apiv1.JobSuspended)
43+
}
44+
45+
// IsSuspended checks if the job is suspended
46+
func IsPartialSucceeded(status apiv1.JobStatus) bool {
47+
return hasCondition(status, apiv1.JobPartialSucceeded)
48+
}
49+
3550
// UpdateJobConditions adds to the jobStatus a new condition if needed, with the conditionType, reason, and message
3651
func UpdateJobConditions(jobStatus *apiv1.JobStatus, conditionType apiv1.JobConditionType, reason, message string) error {
3752
condition := newCondition(conditionType, reason, message)
@@ -103,16 +118,29 @@ func filterOutCondition(conditions []apiv1.JobCondition, condType apiv1.JobCondi
103118
if condType == apiv1.JobRestarting && c.Type == apiv1.JobRunning {
104119
continue
105120
}
106-
if condType == apiv1.JobRunning && c.Type == apiv1.JobRestarting {
121+
if condType == apiv1.JobRunning && (c.Type == apiv1.JobRestarting || c.Type == apiv1.JobResumed) {
122+
continue
123+
}
124+
125+
if condType == apiv1.JobSuspended && c.Type == apiv1.JobResumed {
126+
continue
127+
}
128+
129+
if condType == apiv1.JobResumed && c.Type == apiv1.JobSuspended {
130+
continue
131+
}
132+
133+
if condType == apiv1.JobRestarting && c.Type == apiv1.JobPartialSucceeded {
107134
continue
108135
}
109136

110137
if c.Type == condType {
111138
continue
112139
}
113140

114-
// Set the running condition status to be false when current condition failed or succeeded
115-
if (condType == apiv1.JobFailed || condType == apiv1.JobSucceeded) && c.Type == apiv1.JobRunning {
141+
// Set the running condition status to be false when current condition failed, succeeded or suspended
142+
if (condType == apiv1.JobFailed || condType == apiv1.JobSucceeded ||
143+
condType == apiv1.JobSuspended) && c.Type == apiv1.JobRunning {
116144
c.Status = v1.ConditionFalse
117145
}
118146

0 commit comments

Comments
 (0)