Skip to content

Commit f37ffd4

Browse files
Polish workdir/file_mounts validation and logging. (skypilot-org#495)
* Polish workdir/file_mounts validation and logging. * Fix cloud URIs being displayed with an extra slash Previous: gs://cloud-tpu-test-datasets/fake_imagenet/train-00001-of-01024/ -> /train-00001-of-01024 Now: gs://cloud-tpu-test-datasets/fake_imagenet/train-00001-of-01024 -> /train-00001-of-01024 * Fail early for non-existent local file mount sources. * !r
1 parent e24e270 commit f37ffd4

File tree

4 files changed

+56
-35
lines changed

4 files changed

+56
-35
lines changed

sky/backends/backend.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
"""Sky backend interface."""
2-
from typing import Any, Callable, Dict, Optional
2+
from typing import Dict, Optional
33

44
from sky import resources
55
from sky import task as task_lib
66

77
Task = task_lib.Task
88
Resources = resources.Resources
99
Path = str
10-
PostSetupFn = Callable[[str], Any]
1110

1211

1312
class Backend:

sky/backends/cloud_vm_ray_backend.py

+39-31
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import tempfile
1414
import textwrap
1515
import time
16-
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
16+
from typing import Dict, List, Optional, Tuple, Union
1717

1818
import colorama
1919
from rich import console as rich_console
@@ -34,12 +34,10 @@
3434

3535
Dag = dag_lib.Dag
3636
OptimizeTarget = optimizer.OptimizeTarget
37+
Path = str
3738
Resources = resources_lib.Resources
3839
Task = task_lib.Task
3940

40-
Path = str
41-
PostSetupFn = Callable[[str], Any]
42-
SKY_DIRSIZE_WARN_THRESHOLD = 100
4341
SKY_REMOTE_APP_DIR = backend_utils.SKY_REMOTE_APP_DIR
4442
SKY_REMOTE_WORKDIR = backend_utils.SKY_REMOTE_WORKDIR
4543
SKY_LOGS_DIRECTORY = job_lib.SKY_LOGS_DIRECTORY
@@ -49,6 +47,8 @@
4947
logger = sky_logging.init_logger(__name__)
5048
console = rich_console.Console()
5149

50+
_PATH_SIZE_MEGABYTES_WARN_THRESHOLD = 256
51+
5252

5353
def _check_cluster_name_is_valid(cluster_name: str) -> None:
5454
"""Errors out on invalid cluster names not supported by cloud providers.
@@ -104,7 +104,7 @@ def _remove_cluster_from_ssh_config(cluster_ip: str,
104104

105105

106106
def _path_size_megabytes(path: str) -> int:
107-
# Returns the size of files occupied in path in megabytes
107+
"""Returns the size of 'path' (directory or file) in megabytes."""
108108
return int(
109109
subprocess.check_output(['du', '-sh', '-m',
110110
path]).split()[0].decode('utf-8'))
@@ -1128,22 +1128,24 @@ def sync_workdir(self, handle: ResourceHandle, workdir: Path) -> None:
11281128
ip_list = self._get_node_ips(handle.cluster_yaml, handle.launched_nodes)
11291129
full_workdir = os.path.abspath(os.path.expanduser(workdir))
11301130

1131-
# Raise warning if directory is too large
1132-
if not os.path.exists(full_workdir):
1133-
logger.error(f'{fore.RED}Workdir {workdir} does not exist.'
1134-
f'{style.RESET_ALL}')
1135-
sys.exit(1)
1136-
dir_size = _path_size_megabytes(full_workdir)
1137-
if dir_size >= SKY_DIRSIZE_WARN_THRESHOLD:
1138-
logger.warning(f'{fore.YELLOW}The size of workdir {workdir} '
1139-
f'is {dir_size} MB. Try to keep workdir small, as '
1140-
f'large sizes will slowdown rsync.{style.RESET_ALL}')
1131+
# These asserts have been validated at Task construction time.
1132+
assert os.path.exists(full_workdir), f'{full_workdir} does not exist'
11411133
if os.path.islink(full_workdir):
11421134
logger.warning(
1143-
f'{fore.YELLOW}Workdir {workdir} is a symlink. '
1135+
f'{fore.YELLOW}Workdir {workdir!r} is a symlink. '
11441136
f'Symlink contents are not uploaded.{style.RESET_ALL}')
11451137
else:
1146-
workdir = f'{workdir}/'
1138+
assert os.path.isdir(
1139+
full_workdir), f'{full_workdir} should be a directory.'
1140+
workdir = os.path.join(workdir, '') # Adds trailing / if needed.
1141+
1142+
# Raise warning if directory is too large
1143+
dir_size = _path_size_megabytes(full_workdir)
1144+
if dir_size >= _PATH_SIZE_MEGABYTES_WARN_THRESHOLD:
1145+
logger.warning(
1146+
f'{fore.YELLOW}The size of workdir {workdir!r} '
1147+
f'is {dir_size} MB. Try to keep workdir small, as '
1148+
f'large sizes will slow down rsync.{style.RESET_ALL}')
11471149

11481150
def _sync_workdir_node(ip):
11491151
self._rsync_up(handle,
@@ -1157,8 +1159,11 @@ def _sync_workdir_node(ip):
11571159

11581160
num_nodes = handle.launched_nodes
11591161
plural = 's' if num_nodes > 1 else ''
1160-
logger.info(f'{fore.CYAN}Syncing (on {num_nodes} node{plural}): '
1161-
f'{style.BRIGHT}workdir ({workdir}){style.RESET_ALL}.')
1162+
logger.info(
1163+
f'{fore.CYAN}Syncing workdir (to {num_nodes} node{plural}): '
1164+
f'{style.BRIGHT}{workdir}{style.RESET_ALL}'
1165+
f' -> '
1166+
f'{style.BRIGHT}{SKY_REMOTE_WORKDIR}{style.RESET_ALL}')
11621167
with console.status('[bold cyan]Syncing[/]'):
11631168
backend_utils.run_in_parallel(_sync_workdir_node, ip_list)
11641169

@@ -1192,9 +1197,13 @@ def sync_to_all_nodes(src: str,
11921197
dst: str,
11931198
command: Optional[str] = None,
11941199
run_rsync: Optional[bool] = False):
1195-
full_src = os.path.abspath(os.path.expanduser(src))
1196-
if not os.path.islink(full_src) and not os.path.isfile(full_src):
1197-
src = f'{src}/'
1200+
if run_rsync:
1201+
# Do this for local src paths, not for cloud store URIs
1202+
# (otherwise we have '<abs path to cwd>/gs://.../object/').
1203+
full_src = os.path.abspath(os.path.expanduser(src))
1204+
if not os.path.islink(full_src) and not os.path.isfile(
1205+
full_src):
1206+
src = os.path.join(src, '') # Adds trailing / if needed.
11981207

11991208
def _sync_node(ip):
12001209
if command is not None:
@@ -1225,28 +1234,27 @@ def _sync_node(ip):
12251234

12261235
num_nodes = handle.launched_nodes
12271236
plural = 's' if num_nodes > 1 else ''
1228-
logger.info(f'{fore.CYAN}Syncing (on {num_nodes} node{plural}): '
1229-
f'{style.BRIGHT}{src} -> {dst}{style.RESET_ALL}')
1237+
logger.info(f'{fore.CYAN}Syncing (to {num_nodes} node{plural}): '
1238+
f'{style.BRIGHT}{src}{style.RESET_ALL} -> '
1239+
f'{style.BRIGHT}{dst}{style.RESET_ALL}')
12301240
with console.status('[bold cyan]Syncing[/]'):
12311241
backend_utils.run_in_parallel(_sync_node, ip_list)
12321242

12331243
# Check the files and warn
12341244
for dst, src in mounts.items():
12351245
if not task_lib.is_cloud_store_url(src):
12361246
full_src = os.path.abspath(os.path.expanduser(src))
1237-
if not os.path.exists(full_src):
1238-
logger.error(f'{fore.RED}Directory "{src}" does not exist.'
1239-
f'{style.RESET_ALL}')
1240-
sys.exit(1)
1247+
# Checked during Task.set_file_mounts().
1248+
assert os.path.exists(full_src), f'{full_src} does not exist.'
12411249
src_size = _path_size_megabytes(full_src)
1242-
if src_size >= SKY_DIRSIZE_WARN_THRESHOLD:
1250+
if src_size >= _PATH_SIZE_MEGABYTES_WARN_THRESHOLD:
12431251
logger.warning(
1244-
f'{fore.YELLOW}The size of file mount src {src} '
1252+
f'{fore.YELLOW}The size of file mount src {src!r} '
12451253
f'is {src_size} MB. Try to keep src small, as '
12461254
f'large sizes will slow down rsync.{style.RESET_ALL}')
12471255
if os.path.islink(full_src):
12481256
logger.warning(
1249-
f'{fore.YELLOW}Source path {src} is a symlink. '
1257+
f'{fore.YELLOW}Source path {src!r} is a symlink. '
12501258
f'Symlink contents are not uploaded.{style.RESET_ALL}')
12511259

12521260
for dst, src in mounts.items():

sky/backends/local_docker_backend.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Local docker backend for sky"""
22
import subprocess
33
import tempfile
4-
from typing import Any, Callable, Dict, Optional, Union
4+
from typing import Dict, Optional, Union
55

66
import colorama
77

@@ -17,7 +17,6 @@
1717
Task = task_lib.Task
1818
Resources = resources.Resources
1919
Path = str
20-
PostSetupFn = Callable[[str], Any]
2120

2221
logger = sky_logging.init_logger(__name__)
2322

sky/task.py

+15
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def __init__(
132132
dag.add(self)
133133

134134
def _validate(self):
135+
"""Checks if the Task fields are valid."""
135136
if not _is_valid_name(self.name):
136137
raise ValueError(f'Invalid task name {self.name}. Valid name: '
137138
f'{_VALID_NAME_DESCR}')
@@ -169,6 +170,13 @@ def _validate(self):
169170
f'a command generator ({CommandGen}). '
170171
f'Got {type(self.run)}')
171172

173+
# Workdir.
174+
if self.workdir is not None and not os.path.isdir(self.workdir):
175+
# Symlink to a dir is legal (isdir() follows symlinks).
176+
raise ValueError(
177+
'Workdir must exist and must be a directory (or '
178+
f'a symlink to a directory). Found: {self.workdir}')
179+
172180
@staticmethod
173181
def from_yaml(yaml_path):
174182
with open(os.path.expanduser(yaml_path), 'r') as f:
@@ -478,6 +486,13 @@ def set_file_mounts(self, file_mounts: Optional[Dict[str, str]]) -> None:
478486
if is_cloud_store_url(target):
479487
raise ValueError(
480488
'File mount destination paths cannot be cloud storage')
489+
if not is_cloud_store_url(source):
490+
if not os.path.exists(
491+
os.path.abspath(os.path.expanduser(source))):
492+
raise ValueError(
493+
f'File mount source {source!r} does not exist locally. '
494+
'To fix: check if it exists, and correct the path.')
495+
481496
self.file_mounts = file_mounts
482497
return self
483498

0 commit comments

Comments
 (0)