Skip to content

Commit 7adb54e

Browse files
[Spot] Let the controller aware of the failed setup and fail early (skypilot-org#1479)
* Let the controller aware of the failed setup and fail early * format * Add test * yapf * add test yaml * increase timeout for spot tests * fix * Add timeout for final spot status waiting * yapf * fix merge error * get rid of autostop test for spot controller * reorder * fix comment * Add failed setup status for spot * Update sky/spot/recovery_strategy.py Co-authored-by: Zongheng Yang <[email protected]> * Address comments * format * update and variable names * format * lint * address comments * Address comments * fix test Co-authored-by: Zongheng Yang <[email protected]>
1 parent e6167b9 commit 7adb54e

File tree

7 files changed

+184
-61
lines changed

7 files changed

+184
-61
lines changed

sky/skylet/job_lib.py

+27-6
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ class JobStatus(enum.Enum):
7979
# The `job_id` has been generated, but the generated ray program has
8080
# not started yet. skylet can transit the state from INIT to FAILED
8181
# directly, if the ray program fails to start.
82+
# In the 'jobs' table, the `submitted_at` column will be set to the current
83+
# time, when the job is firstly created (in the INIT state).
8284
INIT = 'INIT'
8385
# Running the user's setup script (only in effect if --detach-setup is
8486
# set). Our update_job_status() can temporarily (for a short period) set
@@ -90,6 +92,8 @@ class JobStatus(enum.Enum):
9092
# by the placement constraints.)
9193
PENDING = 'PENDING'
9294
# The job is running.
95+
# In the 'jobs' table, the `start_at` column will be set to the current
96+
# time, when the job is firstly transitioned to RUNNING.
9397
RUNNING = 'RUNNING'
9498
# 3 terminal states below: once reached, they do not transition.
9599
# The job finished successfully.
@@ -290,12 +294,26 @@ def get_latest_job_id() -> Optional[int]:
290294
return job_id
291295

292296

293-
def get_job_time_payload(job_id: int, is_end: bool) -> Optional[int]:
294-
field = 'end_at' if is_end else 'start_at'
297+
def get_job_submitted_or_ended_timestamp_payload(job_id: int,
298+
get_ended_time: bool) -> str:
299+
"""Get the job submitted/ended timestamp.
300+
301+
This function should only be called by the spot controller,
302+
which is ok to use `submitted_at` instead of `start_at`,
303+
because the spot job duration need to include both setup
304+
and running time and the job will not stay in PENDING
305+
state.
306+
307+
The normal job duration will use `start_at` instead of
308+
`submitted_at` (in `format_job_queue()`), because the job
309+
may stay in PENDING if the cluster is busy.
310+
"""
311+
field = 'end_at' if get_ended_time else 'submitted_at'
295312
rows = _CURSOR.execute(f'SELECT {field} FROM jobs WHERE job_id=(?)',
296313
(job_id,))
297314
for (timestamp,) in rows:
298315
return common_utils.encode_payload(timestamp)
316+
return common_utils.encode_payload(None)
299317

300318

301319
def _get_records_from_rows(rows) -> List[Dict[str, Any]]:
@@ -684,13 +702,16 @@ def get_job_status(cls, job_ids: Optional[List[int]] = None) -> str:
684702
return cls._build(code)
685703

686704
@classmethod
687-
def get_job_time_payload(cls,
688-
job_id: Optional[int] = None,
689-
is_end: bool = False) -> str:
705+
def get_job_submitted_or_ended_timestamp_payload(
706+
cls,
707+
job_id: Optional[int] = None,
708+
get_ended_time: bool = False) -> str:
690709
code = [
691710
f'job_id = {job_id} if {job_id} is not None '
692711
'else job_lib.get_latest_job_id()',
693-
f'job_time = job_lib.get_job_time_payload(job_id, {is_end})',
712+
'job_time = '
713+
'job_lib.get_job_submitted_or_ended_timestamp_payload('
714+
f'job_id, {get_ended_time})',
694715
'print(job_time, flush=True)',
695716
]
696717
return cls._build(code)

sky/spot/controller.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ def _run(self):
6464
logger.info(f'Started monitoring spot task {self._task_name} '
6565
f'(id: {self._job_id})')
6666
spot_state.set_starting(self._job_id)
67-
start_at = self._strategy_executor.launch()
67+
job_submitted_at = self._strategy_executor.launch()
6868

69-
spot_state.set_started(self._job_id, start_time=start_at)
69+
spot_state.set_started(self._job_id, start_time=job_submitted_at)
7070
while True:
7171
time.sleep(spot_utils.JOB_STATUS_CHECK_GAP_SECONDS)
7272

@@ -120,7 +120,9 @@ def _run(self):
120120
if job_status is not None and not job_status.is_terminal():
121121
# The multi-node job is still running, continue monitoring.
122122
continue
123-
elif job_status == job_lib.JobStatus.FAILED:
123+
elif job_status in [
124+
job_lib.JobStatus.FAILED, job_lib.JobStatus.FAILED_SETUP
125+
]:
124126
# The user code has probably crashed, fail immediately.
125127
end_time = spot_utils.get_job_timestamp(self._backend,
126128
self._cluster_name,
@@ -132,10 +134,12 @@ def _run(self):
132134
None,
133135
spot_job_id=self._job_id)
134136
logger.info(f'\n== End of logs (ID: {self._job_id}) ==')
135-
spot_state.set_failed(
136-
self._job_id,
137-
failure_type=spot_state.SpotStatus.FAILED,
138-
end_time=end_time)
137+
status_to_set = spot_state.SpotStatus.FAILED
138+
if job_status == job_lib.JobStatus.FAILED_SETUP:
139+
status_to_set = spot_state.SpotStatus.FAILED_SETUP
140+
spot_state.set_failed(self._job_id,
141+
failure_type=status_to_set,
142+
end_time=end_time)
139143
break
140144
# Although the cluster is healthy, we fail to access the
141145
# job status. Try to recover the job (will not restart the

sky/spot/recovery_strategy.py

+21-17
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def launch(self) -> Optional[float]:
8080
It can fail if resource is not available. Need to check the cluster
8181
status, after calling.
8282
83-
Returns: The job's start timestamp, or None if failed to start.
83+
Returns: The job's submit timestamp, or None if failed.
8484
"""
8585
if self.retry_until_up:
8686
return self._launch(max_retry=None)
@@ -140,8 +140,9 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]:
140140
raise_on_failure: Whether to raise an exception if the launch fails.
141141
142142
Returns:
143-
The job's start timestamp, or None if failed to start and
144-
raise_on_failure is False.
143+
The job's submit timestamp, or None if failed to submit the job
144+
(either provisioning fails or any error happens in job submission)
145+
and raise_on_failure is False.
145146
"""
146147
# TODO(zhwu): handle the failure during `preparing sky runtime`.
147148
retry_cnt = 0
@@ -152,8 +153,11 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]:
152153
exception = None
153154
try:
154155
usage_lib.messages.usage.set_internal()
156+
# Detach setup, so that the setup failure can be detected
157+
# by the controller process (job_status -> FAILED_SETUP).
155158
sky.launch(self.dag,
156159
cluster_name=self.cluster_name,
160+
detach_setup=True,
157161
detach_run=True,
158162
_is_launched_by_spot_controller=True)
159163
logger.info('Spot cluster launched.')
@@ -202,7 +206,7 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]:
202206
# The cluster can be preempted before the job is launched.
203207
# Break to let the retry launch kick in.
204208
logger.info('The cluster is preempted before the job '
205-
'starts.')
209+
'is submitted.')
206210
# TODO(zhwu): we should recover the preemption with the
207211
# recovery strategy instead of the current while loop.
208212
retry_launch = True
@@ -223,11 +227,11 @@ def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]:
223227
continue
224228

225229
# Check the job status until it is not in initialized status
226-
if status is not None and job_lib.JobStatus.PENDING < status:
230+
if status is not None and status > job_lib.JobStatus.INIT:
227231
try:
228-
launch_time = spot_utils.get_job_timestamp(
232+
job_submitted_at = spot_utils.get_job_timestamp(
229233
self.backend, self.cluster_name, get_end_time=False)
230-
return launch_time
234+
return job_submitted_at
231235
except Exception as e: # pylint: disable=broad-except
232236
# If we failed to get the job timestamp, we will retry
233237
# job checking loop.
@@ -271,16 +275,16 @@ def __init__(self, cluster_name: str, backend: 'backends.Backend',
271275
'sky.clouds.Region']] = None
272276

273277
def _launch(self, max_retry=3, raise_on_failure=True) -> Optional[float]:
274-
launch_time = super()._launch(max_retry, raise_on_failure)
275-
if launch_time is not None:
278+
job_submitted_at = super()._launch(max_retry, raise_on_failure)
279+
if job_submitted_at is not None:
276280
# Only record the cloud/region if the launch is successful.
277281
handle = global_user_state.get_handle_from_cluster_name(
278282
self.cluster_name)
279283
assert handle is not None, 'Cluster should be launched.'
280284
launched_resources = handle.launched_resources
281285
self._launched_cloud_region = (launched_resources.cloud,
282286
launched_resources.region)
283-
return launch_time
287+
return job_submitted_at
284288

285289
def recover(self) -> float:
286290
# 1. Cancel the jobs and launch the cluster with the STOPPED status,
@@ -308,11 +312,11 @@ def recover(self) -> float:
308312
region=launched_region)
309313
task.set_resources({new_resources})
310314
# Not using self.launch to avoid the retry until up logic.
311-
launched_time = self._launch(raise_on_failure=False)
315+
job_submitted_at = self._launch(raise_on_failure=False)
312316
# Restore the original dag, i.e. reset the region constraint.
313317
task.set_resources({original_resources})
314-
if launched_time is not None:
315-
return launched_time
318+
if job_submitted_at is not None:
319+
return job_submitted_at
316320

317321
# Step 2
318322
logger.debug('Terminating unhealthy spot cluster and '
@@ -324,9 +328,9 @@ def recover(self) -> float:
324328
logger.debug('Relaunch the cluster without constraining to prior '
325329
'cloud/region.')
326330
# Not using self.launch to avoid the retry until up logic.
327-
launched_time = self._launch(max_retry=self._MAX_RETRY_CNT,
328-
raise_on_failure=False)
329-
if launched_time is None:
331+
job_submitted_at = self._launch(max_retry=self._MAX_RETRY_CNT,
332+
raise_on_failure=False)
333+
if job_submitted_at is None:
330334
# Failed to launch the cluster.
331335
if self.retry_until_up:
332336
gap_seconds = self.RETRY_INIT_GAP_SECONDS
@@ -339,4 +343,4 @@ def recover(self) -> float:
339343
f'Failed to recover the spot cluster after retrying '
340344
f'{self._MAX_RETRY_CNT} times.')
341345

342-
return launched_time
346+
return job_submitted_at

sky/spot/spot_state.py

+58-4
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@
3535
recovery_count INTEGER DEFAULT 0,
3636
job_duration FLOAT DEFAULT 0)""")
3737

38-
# job_duration is the time a job actually runs before last_recover,
39-
# excluding the provision and recovery time.
38+
# job_duration is the time a job actually runs (including the
39+
# setup duration) before last_recover, excluding the provision
40+
# and recovery time.
4041
# If the job is not finished:
4142
# total_job_duration = now() - last_recovered_at + job_duration
4243
# If the job is not finished:
@@ -51,17 +52,66 @@
5152

5253

5354
class SpotStatus(enum.Enum):
54-
"""Spot job status, designed to be in serverless style"""
55+
"""Spot job status, designed to be in serverless style.
56+
57+
The SpotStatus is a higher level status than the JobStatus.
58+
Each spot job submitted to the spot cluster, will have a JobStatus
59+
on that spot cluster:
60+
JobStatus = [INIT, SETTING_UP, PENDING, RUNNING, ...]
61+
Whenever the spot cluster is preempted and recovered, the JobStatus
62+
will go through the statuses above again.
63+
That means during the lifetime of a spot job, its JobsStatus could be
64+
reset to INIT or SETTING_UP multiple times (depending on the preemptions).
65+
66+
However, a spot job only has one SpotStatus on the spot controller.
67+
SpotStatus = [PENDING, SUBMITTED, STARTING, RUNNING, ...]
68+
Mapping from JobStatus to SpotStatus:
69+
INIT -> STARTING/RECOVERING
70+
SETTING_UP -> RUNNING
71+
PENDING -> RUNNING
72+
RUNNING -> RUNNING
73+
SUCCEEDED -> SUCCEEDED
74+
FAILED -> FAILED
75+
FAILED_SETUP -> FAILED_SETUP
76+
Note that the JobStatus will not be stuck in PENDING, because each spot
77+
cluster is dedicated to a spot job, i.e. there should always be enough
78+
resource to run the job and the job will be immediately transitioned to
79+
RUNNING.
80+
"""
81+
# PENDING: Waiting for the spot controller to have a slot to run the
82+
# controller process.
83+
# The submitted_at timestamp of the spot job in the 'spot' table will be
84+
# set to the time when the job is firstly submitted by the user (set to
85+
# PENDING).
5586
PENDING = 'PENDING'
87+
# SUBMITTED: The spot controller starts the controller process.
5688
SUBMITTED = 'SUBMITTED'
89+
# STARTING: The controller process is launching the spot cluster for
90+
# the spot job.
5791
STARTING = 'STARTING'
92+
# RUNNING: The job is submitted to the spot cluster, and is setting up
93+
# or running.
94+
# The start_at timestamp of the spot job in the 'spot' table will be set
95+
# to the time when the job is firstly transitioned to RUNNING.
5896
RUNNING = 'RUNNING'
97+
# RECOVERING: The spot cluster is preempted, and the controller process
98+
# is recovering the spot cluster (relaunching/failover).
5999
RECOVERING = 'RECOVERING'
60100
# Terminal statuses
101+
# SUCCEEDED: The job is finished successfully.
61102
SUCCEEDED = 'SUCCEEDED'
103+
# FAILED: The job is finished with failure from the user's program.
62104
FAILED = 'FAILED'
105+
# FAILED_SETUP: The job is finished with failure from the user's setup
106+
# script.
107+
FAILED_SETUP = 'FAILED_SETUP'
108+
# FAILED_NO_RESOURCE: The job is finished with failure because there is no
109+
# resource available in the cloud provider(s) to launch the spot cluster.
63110
FAILED_NO_RESOURCE = 'FAILED_NO_RESOURCE'
111+
# FAILED_CONTROLLER: The job is finished with failure because of unexpected
112+
# error in the controller process.
64113
FAILED_CONTROLLER = 'FAILED_CONTROLLER'
114+
# CANCELLED: The job is cancelled by the user.
65115
CANCELLED = 'CANCELLED'
66116

67117
def is_terminal(self) -> bool:
@@ -83,7 +133,10 @@ def terminal_statuses(cls) -> List['SpotStatus']:
83133

84134
@classmethod
85135
def failure_statuses(cls) -> List['SpotStatus']:
86-
return [cls.FAILED, cls.FAILED_NO_RESOURCE, cls.FAILED_CONTROLLER]
136+
return [
137+
cls.FAILED, cls.FAILED_SETUP, cls.FAILED_NO_RESOURCE,
138+
cls.FAILED_CONTROLLER
139+
]
87140

88141

89142
_SPOT_STATUS_TO_COLOR = {
@@ -94,6 +147,7 @@ def failure_statuses(cls) -> List['SpotStatus']:
94147
SpotStatus.RECOVERING: colorama.Fore.CYAN,
95148
SpotStatus.SUCCEEDED: colorama.Fore.GREEN,
96149
SpotStatus.FAILED: colorama.Fore.RED,
150+
SpotStatus.FAILED_SETUP: colorama.Fore.RED,
97151
SpotStatus.FAILED_NO_RESOURCE: colorama.Fore.RED,
98152
SpotStatus.FAILED_CONTROLLER: colorama.Fore.RED,
99153
SpotStatus.CANCELLED: colorama.Fore.YELLOW,

sky/spot/spot_utils.py

+16-7
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@
4343
_JOB_CANCELLED_MESSAGE = ('[bold cyan]Waiting for the job status to be updated.'
4444
'[/] It may take a minute.')
4545

46+
# The maximum time to wait for the spot job status to transition to terminal
47+
# state, after the job finished. This is a safeguard to avoid the case where
48+
# the spot job status fails to be updated and keep the `sky spot logs` blocking
49+
# for a long time.
50+
_FINAL_SPOT_STATUS_WAIT_TIMEOUT_SECONDS = 10
51+
4652

4753
class UserSignal(enum.Enum):
4854
"""The signal to be sent to the user."""
@@ -119,9 +125,9 @@ def update_spot_job_status(job_id: Optional[int] = None):
119125

120126
def get_job_timestamp(backend: 'backends.CloudVmRayBackend', cluster_name: str,
121127
get_end_time: bool) -> float:
122-
"""Get the started/ended time of the job."""
123-
code = job_lib.JobLibCodeGen.get_job_time_payload(job_id=None,
124-
is_end=get_end_time)
128+
"""Get the submitted/ended time of the job."""
129+
code = job_lib.JobLibCodeGen.get_job_submitted_or_ended_timestamp_payload(
130+
job_id=None, get_ended_time=get_end_time)
125131
handle = global_user_state.get_handle_from_cluster_name(cluster_name)
126132
returncode, stdout, stderr = backend.run_on_head(handle,
127133
code,
@@ -317,10 +323,13 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
317323
# The spot_status may not be in terminal status yet, since the controllerhas
318324
# not updated the spot state yet. We wait for a while, until the spot state
319325
# is updated.
326+
wait_seconds = 0
320327
spot_status = spot_state.get_status(job_id)
321328
assert spot_status is not None, job_id
322-
while not spot_status.is_terminal() and follow:
329+
while (not spot_status.is_terminal() and follow and
330+
wait_seconds < _FINAL_SPOT_STATUS_WAIT_TIMEOUT_SECONDS):
323331
time.sleep(1)
332+
wait_seconds += 1
324333
spot_status = spot_state.get_status(job_id)
325334
assert spot_status is not None, job_id
326335

@@ -351,12 +360,12 @@ def dump_spot_job_queue() -> str:
351360
if end_at is None:
352361
end_at = time.time()
353362

354-
job_start_at = job['last_recovered_at'] - job['job_duration']
363+
job_submitted_at = job['last_recovered_at'] - job['job_duration']
355364
if job['status'] == spot_state.SpotStatus.RECOVERING:
356365
# When job is recovering, the duration is exact job['job_duration']
357366
job_duration = job['job_duration']
358-
elif job_start_at > 0:
359-
job_duration = end_at - job_start_at
367+
elif job_submitted_at > 0:
368+
job_duration = end_at - job_submitted_at
360369
else:
361370
# When job_start_at <= 0, that means the last_recovered_at is not
362371
# set yet, i.e. the job is not started.

0 commit comments

Comments
 (0)