Skip to content

Commit 6fad18a

Browse files
authored
Parallel sky down (skypilot-org#659)
* fix multi-thread * refactor * Address comment * format * hidden variable * Progress bar for termination * fix * format * mitigate logging problem * rename
1 parent e36d5a7 commit 6fad18a

File tree

5 files changed

+150
-101
lines changed

5 files changed

+150
-101
lines changed

sky/backends/backend_utils.py

+18
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import subprocess
1212
import sys
1313
import textwrap
14+
import threading
1415
import time
1516
import typing
1617
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
@@ -1188,3 +1189,20 @@ def _build(cls, code: List[str]) -> str:
11881189
code = cls._PREFIX + code
11891190
code = ';'.join(code)
11901191
return f'python3 -u -c {code!r}'
1192+
1193+
1194+
class NoOpConsole:
1195+
"""An empty class for multi-threaded console.status."""
1196+
1197+
def __enter__(self):
1198+
pass
1199+
1200+
def __exit__(self, exc_type, exc_val, exc_tb):
1201+
pass
1202+
1203+
1204+
def safe_console_status(msg: str):
1205+
"""A wrapper for multi-threaded console.status."""
1206+
if threading.current_thread() is threading.main_thread():
1207+
return console.status(msg)
1208+
return NoOpConsole()

sky/backends/cloud_vm_ray_backend.py

+23-18
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import colorama
2222
import filelock
23-
from rich import console as rich_console
2423

2524
import sky
2625
from sky import backends
@@ -48,7 +47,6 @@
4847
SKY_REMOTE_RAY_VERSION = backend_utils.SKY_REMOTE_RAY_VERSION
4948

5049
logger = sky_logging.init_logger(__name__)
51-
console = rich_console.Console()
5250

5351
_PATH_SIZE_MEGABYTES_WARN_THRESHOLD = 256
5452

@@ -726,8 +724,9 @@ def _try_provision_tpu(self, to_provision: 'resources_lib.Resources',
726724
assert 'tpu-create-script' in config_dict, \
727725
'Expect TPU provisioning with gcloud.'
728726
try:
729-
with console.status('[bold cyan]Provisioning TPU '
730-
f'[green]{tpu_name}[/]'):
727+
with backend_utils.safe_console_status(
728+
'[bold cyan]Provisioning TPU '
729+
f'[green]{tpu_name}[/]'):
731730
backend_utils.run(f'bash {config_dict["tpu-create-script"]}',
732731
stdout=subprocess.PIPE,
733732
stderr=subprocess.PIPE)
@@ -1317,7 +1316,8 @@ def provision(self,
13171316
# PENDING / RUNNING jobs for the real status, since we do not
13181317
# know the actual previous status of the cluster.
13191318
cmd = backend_utils.JobLibCodeGen.update_status()
1320-
with console.status('[bold cyan]Preparing Job Queue'):
1319+
with backend_utils.safe_console_status(
1320+
'[bold cyan]Preparing Job Queue'):
13211321
returncode, _, stderr = self.run_on_head(
13221322
handle, cmd, require_outputs=True)
13231323
backend_utils.handle_returncode(returncode, cmd,
@@ -1403,7 +1403,7 @@ def _sync_workdir_node(ip):
14031403
tail_cmd = f'tail -n100 -f {log_path}'
14041404
logger.info('To view detailed progress: '
14051405
f'{style.BRIGHT}{tail_cmd}{style.RESET_ALL}')
1406-
with console.status('[bold cyan]Syncing[/]'):
1406+
with backend_utils.safe_console_status('[bold cyan]Syncing[/]'):
14071407
backend_utils.run_in_parallel(_sync_workdir_node, ip_list)
14081408

14091409
def sync_file_mounts(
@@ -1478,7 +1478,7 @@ def _sync_node(ip):
14781478
logger.info(f'{fore.CYAN}Syncing (to {num_nodes} node{plural}): '
14791479
f'{style.BRIGHT}{src}{style.RESET_ALL} -> '
14801480
f'{style.BRIGHT}{dst}{style.RESET_ALL}')
1481-
with console.status('[bold cyan]Syncing[/]'):
1481+
with backend_utils.safe_console_status('[bold cyan]Syncing[/]'):
14821482
backend_utils.run_in_parallel(_sync_node, ip_list)
14831483

14841484
# Check the files and warn
@@ -1633,7 +1633,8 @@ def _setup_node(ip: int) -> int:
16331633
plural = 's' if num_nodes > 1 else ''
16341634
logger.info(f'{fore.CYAN}Running setup on {num_nodes} node{plural}.'
16351635
f'{style.RESET_ALL}')
1636-
with console.status('[bold cyan]Running setup[/]'):
1636+
with backend_utils.safe_console_status(
1637+
'[bold cyan]Running setup[/]'):
16371638
backend_utils.run_in_parallel(_setup_node, ip_list)
16381639
logger.info(f'{fore.GREEN}Setup completed.{style.RESET_ALL}')
16391640
end = time.time()
@@ -1989,8 +1990,8 @@ def teardown_no_lock(self,
19891990
# autoscaler.
19901991
resource_group = config['provider']['resource_group']
19911992
terminate_cmd = f'az group delete -y --name {resource_group}'
1992-
with console.status(f'[bold cyan]Terminating '
1993-
f'[green]{cluster_name}'):
1993+
with backend_utils.safe_console_status(f'[bold cyan]Terminating '
1994+
f'[green]{cluster_name}'):
19941995
returncode, stdout, stderr = log_lib.run_with_log(
19951996
terminate_cmd,
19961997
log_abs_path,
@@ -2014,8 +2015,9 @@ def teardown_no_lock(self,
20142015
terminate_cmd = (
20152016
f'aws ec2 terminate-instances --region {region} '
20162017
f'--instance-ids $({query_cmd})')
2017-
with console.status(f'[bold cyan]Terminating '
2018-
f'[green]{cluster_name}'):
2018+
with backend_utils.safe_console_status(
2019+
f'[bold cyan]Terminating '
2020+
f'[green]{cluster_name}'):
20192021
returncode, stdout, stderr = log_lib.run_with_log(
20202022
terminate_cmd,
20212023
log_abs_path,
@@ -2031,8 +2033,9 @@ def teardown_no_lock(self,
20312033
terminate_cmd = (
20322034
f'gcloud compute instances delete --zone={zone} --quiet '
20332035
f'$({query_cmd})')
2034-
with console.status(f'[bold cyan]Terminating '
2035-
f'[green]{cluster_name}'):
2036+
with backend_utils.safe_console_status(
2037+
f'[bold cyan]Terminating '
2038+
f'[green]{cluster_name}'):
20362039
returncode, stdout, stderr = log_lib.run_with_log(
20372040
terminate_cmd,
20382041
log_abs_path,
@@ -2052,16 +2055,18 @@ def teardown_no_lock(self,
20522055
f.flush()
20532056

20542057
teardown_verb = 'Terminating' if terminate else 'Stopping'
2055-
with console.status(f'[bold cyan]{teardown_verb} '
2056-
f'[green]{cluster_name}'):
2058+
with backend_utils.safe_console_status(
2059+
f'[bold cyan]{teardown_verb} '
2060+
f'[green]{cluster_name}'):
20572061
returncode, stdout, stderr = log_lib.run_with_log(
20582062
['ray', 'down', '-y', f.name],
20592063
log_abs_path,
20602064
stream_logs=False,
20612065
require_outputs=True)
20622066

20632067
if handle.tpu_delete_script is not None:
2064-
with console.status('[bold cyan]Terminating TPU...'):
2068+
with backend_utils.safe_console_status(
2069+
'[bold cyan]Terminating TPU...'):
20652070
tpu_rc, tpu_stdout, tpu_stderr = log_lib.run_with_log(
20662071
['bash', handle.tpu_delete_script],
20672072
log_abs_path,
@@ -2098,7 +2103,7 @@ def teardown_no_lock(self,
20982103
backend_utils.SSHConfigHelper.remove_cluster(cluster_name,
20992104
handle.head_ip,
21002105
auth_config)
2101-
name = global_user_state.get_cluster_name_from_handle(handle)
2106+
name = handle.cluster_name
21022107
global_user_state.remove_cluster(name, terminate=terminate)
21032108

21042109
if terminate:

sky/cli.py

+28-15
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
import click
4242
import pendulum
43-
from rich import console as rich_console
43+
from rich import progress as rich_progress
4444

4545
import sky
4646
from sky import backends
@@ -59,7 +59,6 @@
5959
from sky.backends import backend as backend_lib
6060

6161
logger = sky_logging.init_logger(__name__)
62-
console = rich_console.Console()
6362

6463
_CLUSTER_FLAG_HELP = """\
6564
A cluster name. If provided, either reuse an existing cluster with that name or
@@ -1302,32 +1301,46 @@ def _terminate_or_stop_clusters(names: Tuple[str],
13021301
abort=True,
13031302
show_default=True)
13041303

1305-
for record in to_down: # TODO: parallelize.
1304+
progress = rich_progress.Progress(transient=True)
1305+
operation = 'Terminating' if terminate else 'Stopping'
1306+
plural = 's' if len(to_down) > 1 else ''
1307+
task = progress.add_task(
1308+
f'[bold cyan]{operation} {len(to_down)} cluster{plural}[/]',
1309+
total=len(to_down))
1310+
progress.start()
1311+
1312+
def _terminate_or_stop(record):
13061313
name = record['name']
13071314
handle = record['handle']
13081315
backend = backend_utils.get_backend_from_handle(handle)
13091316
if (isinstance(backend, backends.CloudVmRayBackend) and
13101317
handle.launched_resources.use_spot and not terminate):
1318+
# Disable spot instances to be stopped.
13111319
# TODO(suquark): enable GCP+spot to be stopped in the future.
1320+
progress.stop()
13121321
click.secho(
13131322
f'Stopping cluster {name}... skipped, because spot instances '
13141323
'may lose attached volumes. ',
13151324
fg='green')
13161325
click.echo(' To terminate the cluster, run: ', nl=False)
13171326
click.secho(f'sky down {name}', bold=True)
1318-
continue
1319-
success = backend.teardown(handle, terminate=terminate, purge=purge)
1320-
operation = 'Terminating' if terminate else 'Stopping'
1321-
if success:
1322-
click.secho(f'{operation} cluster {name}...done.', fg='green')
1323-
if not terminate:
1324-
click.echo(' To restart the cluster, run: ', nl=False)
1325-
click.secho(f'sky start {name}', bold=True)
13261327
else:
1327-
click.secho(
1328-
f'{operation} cluster {name}...failed. '
1329-
'Please check the logs and try again.',
1330-
fg='red')
1328+
success = backend.teardown(handle, terminate=terminate, purge=purge)
1329+
progress.stop()
1330+
if success:
1331+
click.secho(f'{operation} cluster {name}...done.', fg='green')
1332+
if not terminate:
1333+
click.echo(' To restart the cluster, run: ', nl=False)
1334+
click.secho(f'sky start {name}', bold=True)
1335+
else:
1336+
click.secho(
1337+
f'{operation} cluster {name}...failed. '
1338+
'Please check the logs and try again.',
1339+
fg='red')
1340+
progress.update(task, advance=1)
1341+
progress.start()
1342+
1343+
backend_utils.run_in_parallel(_terminate_or_stop, to_down)
13311344

13321345

13331346
@_interactive_node_cli_command

0 commit comments

Comments
 (0)