Skip to content

Commit 3c31825

Browse files
Check long cluster names only for GCP; robustify smoke tests. (skypilot-org#966)
* Check long cluster names only for GCP; robustify smoke tests. * test_id comment * Recheck cluster name after cross-cloud retry. * UX: sky spot cancel -a * pylint * Shorten (1) smoke test cluster names (2) gcp cluster name limit. * Revert back timeout * Fix merge error * Make smoke test cluster names a hash; pass into py scripts. * Remove redundant name gen
1 parent 7746339 commit 3c31825

File tree

7 files changed

+170
-120
lines changed

7 files changed

+170
-120
lines changed

Diff for: examples/multi_echo.py

+33-21
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,44 @@
22
import hashlib
33
from multiprocessing import pool
44
import socket
5+
import sys
6+
from typing import Optional
57

68
import sky
79

8-
# (username, last 4 chars of hash of hostname): for uniquefying users on
9-
# shared-account cloud providers.
10-
hostname_hash = hashlib.md5(socket.gethostname().encode()).hexdigest()[-4:]
11-
_user_and_host = f'{getpass.getuser()}-{hostname_hash}'
12-
cluster = f'test-multi-echo-{_user_and_host}'
1310

14-
# Create the cluster.
15-
with sky.Dag() as dag:
16-
cluster_resources = sky.Resources(sky.AWS(), accelerators={'K80': 1})
17-
task = sky.Task(num_nodes=2).set_resources(cluster_resources)
18-
# `detach_run` will only detach the `run` command. The provision and `setup` are
19-
# still blocking.
20-
sky.launch(dag, cluster_name=cluster, detach_run=True)
11+
def run(cluster: Optional[str] = None):
12+
if cluster is None:
13+
# (username, last 4 chars of hash of hostname): for uniquefying users on
14+
# shared-account cloud providers.
15+
hostname_hash = hashlib.md5(
16+
socket.gethostname().encode()).hexdigest()[-4:]
17+
_user_and_host = f'{getpass.getuser()}-{hostname_hash}'
18+
cluster = f'test-multi-echo-{_user_and_host}'
2119

22-
23-
# Submit multiple tasks in parallel to trigger queueing behaviors.
24-
def _exec(i):
20+
# Create the cluster.
2521
with sky.Dag() as dag:
26-
task = sky.Task(run=f'echo {i}; sleep 5')
27-
resources = sky.Resources(accelerators={'K80': 0.5})
28-
task.set_resources(resources)
29-
sky.exec(dag, cluster_name=cluster, detach_run=True)
22+
cluster_resources = sky.Resources(sky.AWS(), accelerators={'K80': 1})
23+
task = sky.Task(num_nodes=2).set_resources(cluster_resources)
24+
# `detach_run` will only detach the `run` command. The provision and
25+
# `setup` are still blocking.
26+
sky.launch(dag, cluster_name=cluster, detach_run=True)
27+
28+
# Submit multiple tasks in parallel to trigger queueing behaviors.
29+
def _exec(i):
30+
with sky.Dag() as dag:
31+
task = sky.Task(run=f'echo {i}; sleep 5')
32+
resources = sky.Resources(accelerators={'K80': 0.5})
33+
task.set_resources(resources)
34+
sky.exec(dag, cluster_name=cluster, detach_run=True)
35+
36+
with pool.ThreadPool(8) as p:
37+
list(p.imap(_exec, range(32)))
3038

3139

32-
with pool.ThreadPool(8) as p:
33-
list(p.imap(_exec, range(32)))
40+
if __name__ == '__main__':
41+
cluster = None
42+
if len(sys.argv) > 1:
43+
# For smoke test passing in a cluster name.
44+
cluster = sys.argv[1]
45+
run(cluster)

Diff for: examples/resnet_distributed_tf_app.py

+74-62
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,88 @@
11
import getpass
22
import hashlib
33
import socket
4-
import subprocess
4+
import sys
55
from typing import List, Optional
66

77
import sky
88

9-
# (username, last 4 chars of hash of hostname): for uniquefying users on
10-
# shared-account cloud providers.
11-
hostname_hash = hashlib.md5(socket.gethostname().encode()).hexdigest()[-4:]
12-
_user_and_host = f'{getpass.getuser()}-{hostname_hash}'
13-
cluster = f'test-distributed-tf-{_user_and_host}'
149

15-
with sky.Dag() as dag:
16-
# Total Nodes, INCLUDING Head Node
17-
num_nodes = 2
10+
def run(cluster: Optional[str] = None):
11+
if cluster is None:
12+
# (username, last 4 chars of hash of hostname): for uniquefying users on
13+
# shared-account cloud providers.
14+
hostname_hash = hashlib.md5(
15+
socket.gethostname().encode()).hexdigest()[-4:]
16+
_user_and_host = f'{getpass.getuser()}-{hostname_hash}'
17+
cluster = f'test-distributed-tf-{_user_and_host}'
1818

19-
# The setup command. Will be run under the working directory.
20-
setup = """
21-
git clone https://github.com/concretevitamin/tpu || true
22-
cd tpu && git checkout 9459fee
23-
conda create -n resnet python=3.7 -y
24-
conda activate resnet
25-
conda install cudatoolkit=11.0 -y
26-
pip install tensorflow==2.4.0 pyyaml
27-
pip install protobuf==3.20
28-
cd models && pip install -e .
29-
"""
19+
with sky.Dag() as dag:
20+
# Total Nodes, INCLUDING Head Node
21+
num_nodes = 2
3022

31-
# The command to run. Will be run under the working directory.
32-
# If a str, run the same command on all nodes.
33-
# Generates per-node commands. Must be a self-contained lambda
34-
# that doesn't refer to any external variables.
35-
#
36-
# Will be run under the working directory.
37-
def run_fn(node_rank: int, ip_list: List[str]) -> Optional[str]:
38-
import json
39-
tf_config = {
40-
'cluster': {
41-
'worker': [ip + ':8008' for ip in ip_list]
42-
},
43-
'task': {
44-
'type': 'worker',
45-
'index': node_rank
23+
# The setup command. Will be run under the working directory.
24+
setup = """
25+
git clone https://github.com/concretevitamin/tpu || true
26+
cd tpu && git checkout 9459fee
27+
conda create -n resnet python=3.7 -y
28+
conda activate resnet
29+
conda install cudatoolkit=11.0 -y
30+
pip install tensorflow==2.4.0 pyyaml
31+
pip install protobuf==3.20
32+
cd models && pip install -e .
33+
"""
34+
35+
# The command to run. Will be run under the working directory.
36+
# If a str, run the same command on all nodes.
37+
# Generates per-node commands. Must be a self-contained lambda
38+
# that doesn't refer to any external variables.
39+
#
40+
# Will be run under the working directory.
41+
def run_fn(node_rank: int, ip_list: List[str]) -> Optional[str]:
42+
import json
43+
tf_config = {
44+
'cluster': {
45+
'worker': [ip + ':8008' for ip in ip_list]
46+
},
47+
'task': {
48+
'type': 'worker',
49+
'index': node_rank
50+
}
4651
}
47-
}
48-
str_tf_config = json.dumps(tf_config)
49-
print(f'{str_tf_config!r}')
50-
run = f"""
51-
cd tpu
52-
conda activate resnet
53-
rm -rf resnet_model-dir
54-
export TF_CONFIG={str_tf_config!r}
55-
export XLA_FLAGS='--xla_gpu_cuda_data_dir=/usr/local/cuda/'
56-
python models/official/resnet/resnet_main.py --use_tpu=False \
57-
--mode=train --train_batch_size=256 --train_steps=500 \
58-
--iterations_per_loop=125 \
59-
--data_dir=gs://cloud-tpu-test-datasets/fake_imagenet \
60-
--model_dir=resnet-model-dir \
61-
--amp --xla --loss_scale=128"""
62-
return run
52+
str_tf_config = json.dumps(tf_config)
53+
print(f'{str_tf_config!r}')
54+
run = f"""
55+
cd tpu
56+
conda activate resnet
57+
rm -rf resnet_model-dir
58+
export TF_CONFIG={str_tf_config!r}
59+
export XLA_FLAGS='--xla_gpu_cuda_data_dir=/usr/local/cuda/'
60+
python models/official/resnet/resnet_main.py --use_tpu=False \
61+
--mode=train --train_batch_size=256 --train_steps=500 \
62+
--iterations_per_loop=125 \
63+
--data_dir=gs://cloud-tpu-test-datasets/fake_imagenet \
64+
--model_dir=resnet-model-dir \
65+
--amp --xla --loss_scale=128"""
66+
return run
67+
68+
train = sky.Task(
69+
'train',
70+
setup=setup,
71+
num_nodes=num_nodes,
72+
run=run_fn,
73+
)
74+
75+
train.set_inputs('gs://cloud-tpu-test-datasets/fake_imagenet',
76+
estimated_size_gigabytes=70)
77+
train.set_outputs('resnet-model-dir', estimated_size_gigabytes=0.1)
78+
train.set_resources(sky.Resources(sky.AWS(), accelerators='V100'))
6379

64-
train = sky.Task(
65-
'train',
66-
setup=setup,
67-
num_nodes=num_nodes,
68-
run=run_fn,
69-
)
80+
sky.launch(dag, cluster_name=cluster)
7081

71-
train.set_inputs('gs://cloud-tpu-test-datasets/fake_imagenet',
72-
estimated_size_gigabytes=70)
73-
train.set_outputs('resnet-model-dir', estimated_size_gigabytes=0.1)
74-
train.set_resources(sky.Resources(sky.AWS(), accelerators='V100'))
7582

76-
sky.launch(dag, cluster_name=cluster)
83+
if __name__ == '__main__':
84+
cluster = None
85+
if len(sys.argv) > 1:
86+
# For smoke test passing in a cluster name.
87+
cluster = sys.argv[1]
88+
run(cluster)

Diff for: sky/backends/backend_utils.py

+21-11
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,13 @@
8989
_TEST_IP = 'https://8.8.8.8'
9090

9191
# GCP has a 63 char limit; however, Ray autoscaler adds many
92-
# characters. Through testing, 37 chars is the maximum length for the Sky
93-
# cluster name on GCP. Ref:
92+
# characters. Through testing, this is the maximum length for the Sky cluster
93+
# name on GCP. Ref:
9494
# https://cloud.google.com/compute/docs/naming-resources#resource-name-format
95-
_MAX_CLUSTER_NAME_LEN = 37
95+
# NOTE: actually 37 is maximum for a single-node cluster which gets the suffix
96+
# '-head', but 35 for a multinode cluster because workers get the suffix
97+
# '-worker'. Here we do not distinguish these cases and take the lower limit.
98+
_MAX_CLUSTER_NAME_LEN_FOR_GCP = 35
9699

97100
# Allow each CPU thread take 2 tasks.
98101
# Note: This value cannot be too small, otherwise OOM issue may occur.
@@ -796,7 +799,9 @@ def wait_until_ray_cluster_ready(
796799
nodes_so_far != num_nodes):
797800
worker_status.stop()
798801
logger.error(
799-
'Timed out when waiting for workers to be provisioned.')
802+
'Timed out: waited for workers to be provisioned '
803+
f'for more than {nodes_launching_progress_timeout} seconds.'
804+
)
800805
return False # failed
801806

802807
if '(no pending nodes)' in output and '(no failures)' in output:
@@ -1555,7 +1560,8 @@ def get_task_resources_str(task: 'task_lib.Task') -> str:
15551560
return resources_str
15561561

15571562

1558-
def check_cluster_name_is_valid(cluster_name: str) -> None:
1563+
def check_cluster_name_is_valid(cluster_name: str,
1564+
cloud: Optional[clouds.Cloud] = None) -> None:
15591565
"""Errors out on invalid cluster names not supported by cloud providers.
15601566
15611567
Bans (including but not limited to) names that:
@@ -1564,19 +1570,23 @@ def check_cluster_name_is_valid(cluster_name: str) -> None:
15641570
"""
15651571
if cluster_name is None:
15661572
return
1567-
# GCP errors return this exact regex. An informal description is also at:
1573+
# GCP errors return this exact regex. An informal description is at:
15681574
# https://cloud.google.com/compute/docs/naming-resources#resource-name-format
15691575
valid_regex = '[a-z]([-a-z0-9]{0,61}[a-z0-9])?'
15701576
if re.fullmatch(valid_regex, cluster_name) is None:
15711577
with ux_utils.print_exception_no_traceback():
15721578
raise ValueError(
15731579
f'Cluster name "{cluster_name}" is invalid; '
15741580
f'ensure it is fully matched by regex: {valid_regex}')
1575-
if len(cluster_name) > _MAX_CLUSTER_NAME_LEN:
1576-
with ux_utils.print_exception_no_traceback():
1577-
raise ValueError(
1578-
f'Cluster name {cluster_name!r} has {len(cluster_name)}'
1579-
f' chars; maximum length is {_MAX_CLUSTER_NAME_LEN} chars.')
1581+
if isinstance(cloud, clouds.GCP):
1582+
# GCP has too restrictive of a length limit. Don't check for other
1583+
# clouds.
1584+
if len(cluster_name) > _MAX_CLUSTER_NAME_LEN_FOR_GCP:
1585+
with ux_utils.print_exception_no_traceback():
1586+
raise ValueError(
1587+
f'Cluster name {cluster_name!r} has {len(cluster_name)}'
1588+
f' chars; maximum length is {_MAX_CLUSTER_NAME_LEN_FOR_GCP}'
1589+
' chars.')
15801590

15811591

15821592
def check_cluster_name_not_reserved(

Diff for: sky/backends/cloud_vm_ray_backend.py

+13-5
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ def add_ray_task(
350350
if script is not None:
351351
sky_env_vars_dict['SKY_NODE_RANK'] = {gang_scheduling_id!r}
352352
sky_env_vars_dict['SKY_JOB_ID'] = {self.job_id}
353-
353+
354354
futures.append(run_bash_command_with_log \\
355355
.options({name_str}{cpu_str}{resources_str}{num_gpus_str}) \\
356356
.remote(
@@ -1187,6 +1187,15 @@ def provision_with_retries(
11871187
while provision_failed:
11881188
provision_failed = False
11891189
try:
1190+
try:
1191+
# Recheck cluster name as the 'except:' block below may
1192+
# change the cloud assignment.
1193+
backend_utils.check_cluster_name_is_valid(
1194+
cluster_name, to_provision.cloud)
1195+
except ValueError as value_error:
1196+
# Let failover below handle this (i.e., block this cloud).
1197+
raise exceptions.ResourcesUnavailableError(
1198+
) from value_error
11901199
config_dict = self._retry_region_zones(
11911200
to_provision,
11921201
num_nodes,
@@ -1449,26 +1458,25 @@ def _provision(self,
14491458
cluster_name: str,
14501459
retry_until_up: bool = False) -> ResourceHandle:
14511460
"""Provisions using 'ray up'."""
1452-
# Try to launch the exiting cluster first
1453-
backend_utils.check_cluster_name_is_valid(cluster_name)
1454-
# ray up: the VMs.
14551461
# FIXME: ray up for Azure with different cluster_names will overwrite
14561462
# each other.
1457-
14581463
lock_path = os.path.expanduser(
14591464
backend_utils.CLUSTER_STATUS_LOCK_PATH.format(cluster_name))
14601465
with timeline.FileLockEvent(lock_path):
14611466
to_provision_config = RetryingVmProvisioner.ToProvisionConfig(
14621467
cluster_name, to_provision, task.num_nodes)
14631468
prev_cluster_status = None
14641469
if not dryrun: # dry run doesn't need to check existing cluster.
1470+
# Try to launch the exiting cluster first
14651471
to_provision_config = self._check_existing_cluster(
14661472
task, to_provision, cluster_name)
14671473
prev_cluster_status, _ = (
14681474
backend_utils.refresh_cluster_status_handle(
14691475
cluster_name, acquire_per_cluster_status_lock=False))
14701476
assert to_provision_config.resources is not None, (
14711477
'to_provision should not be None', to_provision_config)
1478+
backend_utils.check_cluster_name_is_valid(
1479+
cluster_name, to_provision_config.resources.cloud)
14721480

14731481
usage_lib.messages.usage.update_cluster_resources(
14741482
to_provision_config.num_nodes, to_provision_config.resources)

Diff for: sky/cli.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -2591,14 +2591,14 @@ def spot_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool):
25912591
f'Provided {argument_str!r}.')
25922592

25932593
if not yes:
2594-
job_identity_str = f'with IDs {job_id_str}' if job_ids else repr(name)
2594+
job_identity_str = (f'managed spot jobs with IDs {job_id_str}'
2595+
if job_ids else repr(name))
25952596
if all:
25962597
job_identity_str = 'all managed spot jobs'
2597-
click.confirm(
2598-
f'Cancelling managed spot job {job_identity_str}. Proceed?',
2599-
default=True,
2600-
abort=True,
2601-
show_default=True)
2598+
click.confirm(f'Cancelling {job_identity_str}. Proceed?',
2599+
default=True,
2600+
abort=True,
2601+
show_default=True)
26022602

26032603
backend = backend_utils.get_backend_from_handle(handle)
26042604
assert isinstance(backend, backends.CloudVmRayBackend)

Diff for: sky/usage/usage_lib.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Logging events to Grafana Loki"""
1+
"""Logging events to Grafana Loki."""
22

33
import enum
44
import click

0 commit comments

Comments
 (0)