Skip to content

Commit b5e277e

Browse files
authored
Auto stop for cluster (skypilot-org#653)
* refactorize skylet * implement autostop event without cluster stopping * wip * Remove autostop from yaml file * fix naming * fix config * fix skylet * add autostop to status * fix state and name match * Replace min_workers/max_workers for gcp * using ray up / ray down process * fix stopping * set autostop in globle user state * update sky status * format * Add refresh to sky status * address comments * comment * address comments * Fix logging * update help * remove ssh config and bring cursor back * Fix exec on stopped instance * address comment * format * fix * Add test for autostop * Fix cancel * address comment * address comment * Fix sky launch will change autostop to -1 * format * Add docs * update
1 parent 0cf0765 commit b5e277e

22 files changed

+756
-169
lines changed

docs/source/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ Key features:
3030
examples/syncing-code-artifacts
3131
examples/auto-failover
3232
reference/job-queue
33+
reference/auto-stop
3334
examples/grid-search
3435
examples/distributed-jobs
3536
.. Additional Examples <https://github.com/concretevitamin/sky-experiments/tree/master/prototype/examples>

docs/source/reference/auto-stop.rst

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
.. _job-queue:
2+
Auto-stopping
3+
=========
4+
5+
Sky's **auto-stopping** can automatically stop a cluster after a few minutes of idleness.
6+
7+
To setup auto-stopping for a cluster, :code:`sky autostop` can be used.
8+
9+
.. code-block:: bash
10+
11+
# Launch a cluster with logging detached
12+
sky launch -c mycluster -d cluster.yaml
13+
14+
# Set auto-stopping for the cluster, after cluster will be stopped 10 minutes of idleness
15+
sky autostop mycluster -i 10
16+
17+
The :code:`-d / --detach` flag detaches logging from the terminal.
18+
19+
To cancel the auto-stop scheduled on the cluster:
20+
21+
.. code-block:: bash
22+
23+
# Cancel auto-stop for the cluster
24+
sky autostop mycluster --cancel
25+
26+
To view the status of the cluster:
27+
28+
.. code-block:: bash
29+
30+
# Show a cluster's jobs (IDs, statuses).
31+
sky status
32+
NAME LAUNCHED RESOURCES STATUS AUTOSTOP COMMAND
33+
mucluster 1 min ago 2x AWS(m4.2xlarge) UP 0 min sky launch -d -c ...
34+
35+
# Refresh the status for auto-stopping
36+
sky status --refresh
37+
NAME LAUNCHED RESOURCES STATUS AUTOSTOP COMMAND
38+
mucluster 1 min ago 2x AWS(m4.2xlarge) STOPPED - sky launch -d -c ...
39+
40+
41+
The cluster status in :code:`sky status` shows the cached status of the cluster, which can be out-dated for clusters with auto-stopping scheduled. To view a real status of the cluster with auto-stopping scheduled, use :code:`sky status --refresh`.
42+

docs/source/reference/cli.rst

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ Core
2929
:prog: sky status
3030
:nested: full
3131

32+
.. click:: sky.cli:autostop
33+
:prog: sky autostop
34+
:nested: full
3235

3336
Interactive Nodes
3437
------------

sky/backends/backend_utils.py

+69-87
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from sky import backends
2727
from sky import check as sky_check
2828
from sky import clouds
29+
from sky import global_user_state
2930
from sky import exceptions
3031
from sky import sky_logging
3132
from sky.adaptors import azure
@@ -40,6 +41,7 @@
4041
# NOTE: keep in sync with the cluster template 'file_mounts'.
4142
SKY_REMOTE_WORKDIR = log_lib.SKY_REMOTE_WORKDIR
4243
SKY_REMOTE_APP_DIR = '~/.sky/sky_app'
44+
SKY_RAY_YAML_REMOTE_PATH = '~/.sky/sky_ray.yml'
4345
IP_ADDR_REGEX = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
4446
SKY_REMOTE_RAY_VERSION = '1.10.0'
4547
SKY_REMOTE_PATH = '~/.sky/sky_wheels'
@@ -71,8 +73,6 @@ def _fill_template(template_name: str,
7173
raise FileNotFoundError(f'Template "{template_name}" does not exist.')
7274
with open(template_path) as fin:
7375
template = fin.read()
74-
template = jinja2.Template(template)
75-
content = template.render(**variables)
7676
if output_path is None:
7777
assert 'cluster_name' in variables, 'cluster_name is required.'
7878
cluster_name = variables['cluster_name']
@@ -81,6 +81,12 @@ def _fill_template(template_name: str,
8181
os.makedirs(output_path.parents[0], exist_ok=True)
8282
output_path = str(output_path)
8383
output_path = os.path.abspath(output_path)
84+
85+
# Add yaml file path to the template variables.
86+
variables['sky_ray_yaml_remote_path'] = SKY_RAY_YAML_REMOTE_PATH
87+
variables['sky_ray_yaml_local_path'] = output_path
88+
template = jinja2.Template(template)
89+
content = template.render(**variables)
8490
with open(output_path, 'w') as fout:
8591
fout.write(content)
8692
return output_path
@@ -1046,17 +1052,30 @@ def get_node_ips(
10461052
handle is not None and handle.head_ip is not None):
10471053
return [handle.head_ip]
10481054

1049-
out = run(f'ray get-head-ip {yaml_handle}',
1050-
stdout=subprocess.PIPE).stdout.decode().strip()
1051-
head_ip = re.findall(IP_ADDR_REGEX, out)
1052-
assert 1 == len(head_ip), out
1055+
try:
1056+
proc = run(f'ray get-head-ip {yaml_handle}',
1057+
stdout=subprocess.PIPE,
1058+
stderr=subprocess.PIPE)
1059+
out = proc.stdout.decode().strip()
1060+
head_ip = re.findall(IP_ADDR_REGEX, out)
1061+
except subprocess.CalledProcessError as e:
1062+
raise exceptions.FetchIPError(
1063+
exceptions.FetchIPError.Reason.HEAD) from e
1064+
if len(head_ip) != 1:
1065+
raise exceptions.FetchIPError(exceptions.FetchIPError.Reason.HEAD)
10531066

10541067
if expected_num_nodes > 1:
1055-
out = run(f'ray get-worker-ips {yaml_handle}',
1056-
stdout=subprocess.PIPE).stdout.decode()
1057-
worker_ips = re.findall(IP_ADDR_REGEX, out)
1058-
assert expected_num_nodes - 1 == len(worker_ips), (expected_num_nodes -
1059-
1, out)
1068+
try:
1069+
proc = run(f'ray get-worker-ips {yaml_handle}',
1070+
stdout=subprocess.PIPE,
1071+
stderr=subprocess.PIPE)
1072+
out = proc.stdout.decode()
1073+
worker_ips = re.findall(IP_ADDR_REGEX, out)
1074+
except subprocess.CalledProcessError as e:
1075+
raise exceptions.FetchIPError(
1076+
exceptions.FetchIPError.Reason.WORKER) from e
1077+
if len(worker_ips) != expected_num_nodes - 1:
1078+
raise exceptions.FetchIPError(exceptions.FetchIPError.Reason.WORKER)
10601079
else:
10611080
worker_ips = []
10621081
if return_private_ips:
@@ -1085,6 +1104,45 @@ def get_head_ip(
10851104
return head_ip
10861105

10871106

1107+
def _ping_cluster_or_set_to_stopped(
1108+
record: Dict[str, Any]) -> global_user_state.ClusterStatus:
1109+
handle = record['handle']
1110+
if not isinstance(handle, backends.CloudVmRayBackend.ResourceHandle):
1111+
return record
1112+
# Autostop is disabled for the cluster
1113+
if record['autostop'] < 0:
1114+
return record
1115+
cluster_name = handle.cluster_name
1116+
try:
1117+
get_node_ips(handle.cluster_yaml, handle.launched_nodes)
1118+
return record
1119+
except exceptions.FetchIPError as e:
1120+
# Set the cluster status to STOPPED, even the head node is still alive,
1121+
# since it will be stopped as soon as the workers are stopped.
1122+
logger.debug(f'Failed to get IPs from cluster {cluster_name}: {e}, '
1123+
'set to STOPPED')
1124+
global_user_state.remove_cluster(cluster_name, terminate=False)
1125+
auth_config = read_yaml(handle.cluster_yaml)['auth']
1126+
SSHConfigHelper.remove_cluster(cluster_name, handle.head_ip, auth_config)
1127+
return global_user_state.get_cluster_from_name(cluster_name)
1128+
1129+
1130+
def get_status_from_cluster_name(
1131+
cluster_name: str) -> Optional[global_user_state.ClusterStatus]:
1132+
record = global_user_state.get_cluster_from_name(cluster_name)
1133+
if record is None:
1134+
return None
1135+
record = _ping_cluster_or_set_to_stopped(record)
1136+
return record['status']
1137+
1138+
1139+
def get_clusters(refresh: bool) -> List[Dict[str, Any]]:
1140+
records = global_user_state.get_clusters()
1141+
if not refresh:
1142+
return records
1143+
return [_ping_cluster_or_set_to_stopped(record) for record in records]
1144+
1145+
10881146
def query_head_ip_with_retries(cluster_yaml: str, retry_count: int = 1) -> str:
10891147
"""Returns the ip of the head node from yaml file."""
10901148
for i in range(retry_count):
@@ -1120,82 +1178,6 @@ def get_backend_from_handle(
11201178
return backend
11211179

11221180

1123-
class JobLibCodeGen(object):
1124-
"""Code generator for job utility functions.
1125-
1126-
Usage:
1127-
1128-
>> codegen = JobLibCodeGen.add_job(...)
1129-
"""
1130-
1131-
_PREFIX = ['from sky.skylet import job_lib, log_lib']
1132-
1133-
@classmethod
1134-
def add_job(cls, job_name: str, username: str, run_timestamp: str) -> str:
1135-
if job_name is None:
1136-
job_name = '-'
1137-
code = [
1138-
'job_id = job_lib.add_job('
1139-
f'{job_name!r}, {username!r}, {run_timestamp!r})',
1140-
'print(job_id, flush=True)',
1141-
]
1142-
return cls._build(code)
1143-
1144-
@classmethod
1145-
def update_status(cls) -> str:
1146-
code = [
1147-
'job_lib.update_status()',
1148-
]
1149-
return cls._build(code)
1150-
1151-
@classmethod
1152-
def show_jobs(cls, username: Optional[str], all_jobs: bool) -> str:
1153-
code = [f'job_lib.show_jobs({username!r}, {all_jobs})']
1154-
return cls._build(code)
1155-
1156-
@classmethod
1157-
def cancel_jobs(cls, job_ids: Optional[List[int]]) -> str:
1158-
code = [f'job_lib.cancel_jobs({job_ids!r})']
1159-
return cls._build(code)
1160-
1161-
@classmethod
1162-
def fail_all_jobs_in_progress(cls) -> str:
1163-
# Used only for restarting a cluster.
1164-
code = ['job_lib.fail_all_jobs_in_progress()']
1165-
return cls._build(code)
1166-
1167-
@classmethod
1168-
def tail_logs(cls, job_id: int) -> str:
1169-
code = [
1170-
f'log_dir = job_lib.log_dir({job_id})',
1171-
f'log_lib.tail_logs({job_id}, log_dir)',
1172-
]
1173-
return cls._build(code)
1174-
1175-
@classmethod
1176-
def get_job_status(cls, job_id: str) -> str:
1177-
# Prints "Job <id> <status>" for UX; caller should parse the last token.
1178-
code = [
1179-
f'job_status = job_lib.get_status({job_id})',
1180-
f'print("Job", {job_id}, job_status.value, flush=True)',
1181-
]
1182-
return cls._build(code)
1183-
1184-
@classmethod
1185-
def get_log_path(cls, job_id: int) -> str:
1186-
code = [
1187-
f'log_dir = job_lib.log_dir({job_id})',
1188-
'print(log_dir, flush=True)',
1189-
]
1190-
return cls._build(code)
1191-
1192-
@classmethod
1193-
def _build(cls, code: List[str]) -> str:
1194-
code = cls._PREFIX + code
1195-
code = ';'.join(code)
1196-
return f'python3 -u -c {code!r}'
1197-
1198-
11991181
class NoOpConsole:
12001182
"""An empty class for multi-threaded console.status."""
12011183

0 commit comments

Comments
 (0)