Skip to content

Commit 4ece4e6

Browse files
authored
Add line processor abstraction and fix gitignored path size (skypilot-org#615)
1 parent 3724532 commit 4ece4e6

File tree

3 files changed

+129
-124
lines changed

3 files changed

+129
-124
lines changed

sky/backends/cloud_vm_ray_backend.py

+28-54
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from sky import task as task_lib
3333
from sky.backends import backend_utils
3434
from sky.backends import wheel_utils
35-
from sky.skylet import autostop_lib, job_lib, log_lib
35+
from sky.skylet import autostop_lib, job_lib, log_lib, log_utils
3636

3737
if typing.TYPE_CHECKING:
3838
from sky import dag
@@ -56,6 +56,10 @@
5656
_LOCK_FILENAME = '~/.sky/.{}.lock'
5757
_FILELOCK_TIMEOUT_SECONDS = 10
5858

59+
_RSYNC_DISPLAY_OPTION = '-Pavz'
60+
_RSYNC_FILTER_OPTION = '--filter=\'dir-merge,- .gitignore\''
61+
_RSYNC_EXCLUDE_OPTION = '--exclude-from=.git/info/exclude'
62+
5963

6064
def _check_cluster_name_is_valid(cluster_name: str) -> None:
6165
"""Errors out on invalid cluster names not supported by cloud providers.
@@ -100,43 +104,18 @@ def _get_task_demands_dict(
100104
return accelerator_dict
101105

102106

103-
def _path_size_megabytes(path: str, exclude_gitignore: bool = False) -> int:
104-
"""Returns the size of 'path' (directory or file) in megabytes.
105-
106-
Args:
107-
path: The path to check.
108-
exclude_gitignore: If True, excludes files matched in .gitignore.
109-
110-
Returns:
111-
The size of 'path' in megabytes.
112-
"""
113-
if exclude_gitignore:
114-
try:
115-
# FIXME: add git index size (du -hsc .git) in this computation.
116-
awk_program = '{ sum += $1 } END { print sum }'
117-
return int(
118-
subprocess.check_output(
119-
f'( git status --short {path} | '
120-
'grep "^?" | cut -d " " -f2- '
121-
f'&& git ls-files {path} ) | '
122-
'xargs -n 1000 du -hsk | '
123-
f'awk {awk_program!r}',
124-
shell=True,
125-
stderr=subprocess.DEVNULL)) // (2**10)
126-
except (subprocess.CalledProcessError, ValueError):
127-
# If git is not installed, or if the user is not in a git repo.
128-
# Fall back to du -shk if it is not a git repo (size does not
129-
# consider .gitignore).
130-
logger.debug('Failed to get size with .gitignore exclusion, '
131-
'falling back to du -shk')
132-
pass
133-
return int(
134-
subprocess.check_output([
135-
'du',
136-
'-sh',
137-
'-k',
138-
path,
139-
]).split()[0].decode('utf-8')) // (2**10)
107+
def _path_size_megabytes(path: str) -> int:
108+
"""Returns the size of 'path' (directory or file) in megabytes."""
109+
git_exclude_filter = ''
110+
if (pathlib.Path(path) / '.git/info/exclude').exists():
111+
git_exclude_filter = f' {_RSYNC_EXCLUDE_OPTION}'
112+
rsync_output = str(
113+
subprocess.check_output(
114+
f'rsync {_RSYNC_DISPLAY_OPTION} {_RSYNC_FILTER_OPTION}'
115+
f'{git_exclude_filter} --dry-run {path}',
116+
shell=True).splitlines()[-1])
117+
total_bytes = rsync_output.split(' ')[3].replace(',', '')
118+
return int(total_bytes) // 10**6
140119

141120

142121
class RayCodeGen:
@@ -201,7 +180,7 @@ def add_prologue(self, job_id: int) -> None:
201180
import ray
202181
import ray.util as ray_util
203182
204-
from sky.skylet import job_lib
183+
from sky.skylet import job_lib, log_utils
205184
206185
SKY_REMOTE_WORKDIR = {log_lib.SKY_REMOTE_WORKDIR!r}
207186
job_lib.set_status({job_id!r}, job_lib.JobStatus.PENDING)
@@ -946,7 +925,7 @@ def ray_up(start_streaming_at):
946925
log_abs_path,
947926
stream_logs=False,
948927
start_streaming_at=start_streaming_at,
949-
parse_ray_up_logs=True,
928+
line_processor=log_utils.RayUpLineProcessor(),
950929
# Reduce BOTO_MAX_RETRIES from 12 to 5 to avoid long hanging
951930
# time during 'ray up' if insufficient capacity occurs.
952931
env=dict(os.environ, BOTO_MAX_RETRIES='5'),
@@ -1397,15 +1376,13 @@ def sync_workdir(self, handle: ResourceHandle, workdir: Path) -> None:
13971376
workdir = os.path.join(workdir, '') # Adds trailing / if needed.
13981377

13991378
# Raise warning if directory is too large
1400-
dir_size = _path_size_megabytes(full_workdir, exclude_gitignore=True)
1379+
dir_size = _path_size_megabytes(full_workdir)
14011380
if dir_size >= _PATH_SIZE_MEGABYTES_WARN_THRESHOLD:
14021381
logger.warning(
14031382
f'{fore.YELLOW}The size of workdir {workdir!r} '
14041383
f'is {dir_size} MB. Try to keep workdir small or use '
1405-
'.gitignore to exclude large files, as '
1406-
'large sizes will slow down rsync. If you use .gitignore but '
1407-
'the path is not initialized in git, you can ignore this '
1408-
f'warning.{style.RESET_ALL}')
1384+
'.gitignore to exclude large files, as large sizes will slow '
1385+
'down rsync. {style.RESET_ALL}')
14091386

14101387
log_path = os.path.join(self.log_dir, 'workdir_sync.log')
14111388

@@ -1512,16 +1489,13 @@ def _sync_node(ip):
15121489
full_src = os.path.abspath(os.path.expanduser(src))
15131490
# Checked during Task.set_file_mounts().
15141491
assert os.path.exists(full_src), f'{full_src} does not exist.'
1515-
src_size = _path_size_megabytes(full_src,
1516-
exclude_gitignore=True)
1492+
src_size = _path_size_megabytes(full_src)
15171493
if src_size >= _PATH_SIZE_MEGABYTES_WARN_THRESHOLD:
15181494
logger.warning(
15191495
f'{fore.YELLOW}The size of file mount src {src!r} '
15201496
f'is {src_size} MB. Try to keep src small or use '
1521-
'.gitignore to exclude large files, as '
1522-
'large sizes will slow down rsync. If you use '
1523-
'.gitignore but the path is not initialized in git, you'
1524-
f' can ignore this warning.{style.RESET_ALL}')
1497+
'.gitignore to exclude large files, as large sizes '
1498+
f'will slow down rsync. {style.RESET_ALL}')
15251499
if os.path.islink(full_src):
15261500
logger.warning(
15271501
f'{fore.YELLOW}Source path {src!r} is a symlink. '
@@ -2171,19 +2145,19 @@ def _rsync_up(
21712145
# shooting a lot of messages to the output. --info=progress2 is used
21722146
# to get a total progress bar, but it requires rsync>=3.1.0 and Mac
21732147
# OS has a default rsync==2.6.9 (16 years old).
2174-
rsync_command = ['rsync', '-Pavz']
2148+
rsync_command = ['rsync', _RSYNC_DISPLAY_OPTION]
21752149
# Legend
21762150
# dir-merge: ignore file can appear in any subdir, applies to that
21772151
# subdir downwards
21782152
# Note that "-" is mandatory for rsync and means all patterns in the
21792153
# ignore files are treated as *exclude* patterns. Non-exclude
21802154
# patterns, e.g., "! do_not_exclude" doesn't work, even though git
21812155
# allows it.
2182-
rsync_command.append('--filter=\'dir-merge,- .gitignore\'')
2156+
rsync_command.append(_RSYNC_FILTER_OPTION)
21832157
git_exclude = '.git/info/exclude'
21842158
if (pathlib.Path(source) / git_exclude).exists():
21852159
# Ensure file exists; otherwise, rsync will error out.
2186-
rsync_command.append('--exclude-from=.git/info/exclude')
2160+
rsync_command.append(_RSYNC_EXCLUDE_OPTION)
21872161

21882162
ssh_options = ' '.join(
21892163
backend_utils.ssh_options_list(ssh_key,

sky/skylet/log_lib.py

+50-70
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
"""Sky logging utils.
1+
"""Sky logging library.
22
33
This is a remote utility module that provides logging functionality.
44
"""
5-
import enum
65
import io
76
import os
87
import selectors
@@ -14,47 +13,32 @@
1413
from typing import Iterator, List, Optional, Tuple, Union
1514

1615
import colorama
17-
import rich.status
1816

19-
from sky.skylet import job_lib
17+
from sky.skylet import job_lib, log_utils
2018
from sky import sky_logging
2119

2220
SKY_REMOTE_WORKDIR = '~/sky_workdir'
2321

2422
logger = sky_logging.init_logger(__name__)
2523

2624

27-
class ProvisionStatus(enum.Enum):
28-
LAUNCH = 0
29-
RUNTIME_SETUP = 1
30-
31-
32-
def _update_ray_up_status(log_line, ray_up_state: str,
33-
status_display: rich.status.Status) -> None:
34-
if 'Shared connection to' in log_line and ray_up_state[
35-
'state'] == ProvisionStatus.LAUNCH:
36-
status_display.stop()
37-
logger.info(f'{colorama.Fore.GREEN}Head node is up.'
38-
f'{colorama.Style.RESET_ALL}')
39-
status_display.start()
40-
status_display.update('[bold cyan] Preparing Sky runtime')
41-
ray_up_state['state'] = ProvisionStatus.RUNTIME_SETUP
42-
43-
44-
def redirect_process_output(proc,
45-
log_path: str,
46-
stream_logs: bool,
47-
start_streaming_at: str = '',
48-
skip_lines: Optional[List[str]] = None,
49-
parse_ray_up_logs: bool = False,
50-
replace_crlf: bool = False) -> Tuple[str, str]:
25+
def redirect_process_output(
26+
proc,
27+
log_path: str,
28+
stream_logs: bool,
29+
start_streaming_at: str = '',
30+
skip_lines: Optional[List[str]] = None,
31+
replace_crlf: bool = False,
32+
line_processor: Optional[log_utils.LineProcessor] = None
33+
) -> Tuple[str, str]:
5134
"""Redirect the process's filtered stdout/stderr to both stream and file"""
52-
# FIXME(gmittal): Remove hard-coded `parse_ray_up_logs` flag and add general
53-
# LineParser: https://github.com/sky-proj/sky/pull/565#discussion_r826615923
5435
log_path = os.path.expanduser(log_path)
5536
dirname = os.path.dirname(log_path)
5637
os.makedirs(dirname, exist_ok=True)
5738

39+
if line_processor is None:
40+
line_processor = log_utils.LineProcessor()
41+
5842
sel = selectors.DefaultSelector()
5943
out_io = io.TextIOWrapper(proc.stdout,
6044
encoding='utf-8',
@@ -72,44 +56,40 @@ def redirect_process_output(proc,
7256
stderr = ''
7357

7458
start_streaming_flag = False
75-
if parse_ray_up_logs:
76-
ray_up_state = {'state': ProvisionStatus.LAUNCH}
77-
provision_status = rich.status.Status('[bold cyan]Launching')
78-
provision_status.start()
79-
with open(log_path, 'a') as fout:
80-
while len(sel.get_map()) > 0:
81-
events = sel.select()
82-
for key, _ in events:
83-
line = key.fileobj.readline()
84-
if not line:
85-
# Unregister the io when EOF reached
86-
sel.unregister(key.fileobj)
87-
continue
88-
if replace_crlf and line.endswith('\r\n'):
89-
# Replace CRLF with LF to avoid ray logging to the same line
90-
# due to separating lines with '\n'.
91-
line = line[:-2] + '\n'
92-
if (skip_lines is not None and
93-
any(skip in line for skip in skip_lines)):
94-
continue
95-
if start_streaming_at in line:
96-
start_streaming_flag = True
97-
if key.fileobj is out_io:
98-
stdout += line
99-
out_stream = sys.stdout
100-
else:
101-
stderr += line
102-
out_stream = sys.stderr
103-
if stream_logs and start_streaming_flag:
104-
out_stream.write(line)
105-
out_stream.flush()
106-
if log_path != '/dev/null':
107-
fout.write(line)
108-
fout.flush()
109-
if parse_ray_up_logs:
110-
_update_ray_up_status(line, ray_up_state, provision_status)
111-
if parse_ray_up_logs:
112-
provision_status.stop()
59+
with line_processor:
60+
with open(log_path, 'a') as fout:
61+
while len(sel.get_map()) > 0:
62+
events = sel.select()
63+
for key, _ in events:
64+
line = key.fileobj.readline()
65+
if not line:
66+
# Unregister the io when EOF reached
67+
sel.unregister(key.fileobj)
68+
continue
69+
# TODO(zhwu,gmittal): Put replace_crlf, skip_lines, and
70+
# start_streaming_at logic in processor.process_line(line)
71+
if replace_crlf and line.endswith('\r\n'):
72+
# Replace CRLF with LF to avoid ray logging to the same
73+
# line due to separating lines with '\n'.
74+
line = line[:-2] + '\n'
75+
if (skip_lines is not None and
76+
any(skip in line for skip in skip_lines)):
77+
continue
78+
if start_streaming_at in line:
79+
start_streaming_flag = True
80+
if key.fileobj is out_io:
81+
stdout += line
82+
out_stream = sys.stdout
83+
else:
84+
stderr += line
85+
out_stream = sys.stderr
86+
if stream_logs and start_streaming_flag:
87+
out_stream.write(line)
88+
out_stream.flush()
89+
if log_path != '/dev/null':
90+
fout.write(line)
91+
fout.flush()
92+
line_processor.process_line(line)
11393
return stdout, stderr
11494

11595

@@ -122,7 +102,7 @@ def run_with_log(
122102
shell: bool = False,
123103
with_ray: bool = False,
124104
redirect_stdout_stderr: bool = True,
125-
parse_ray_up_logs: bool = False,
105+
line_processor: Optional[log_utils.LineProcessor] = None,
126106
**kwargs,
127107
) -> Union[int, Tuple[int, str, str]]:
128108
"""Runs a command and logs its output to a file.
@@ -181,7 +161,7 @@ def run_with_log(
181161
'bash: cannot set terminal process group',
182162
'bash: no job control in this shell',
183163
],
184-
parse_ray_up_logs=parse_ray_up_logs,
164+
line_processor=line_processor,
185165
# Replace CRLF when the output is logged to driver by ray.
186166
replace_crlf=with_ray,
187167
)

sky/skylet/log_utils.py

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Sky logging utils."""
2+
import enum
3+
4+
import colorama
5+
import rich.status
6+
7+
from sky import sky_logging
8+
9+
logger = sky_logging.init_logger(__name__)
10+
11+
12+
class LineProcessor(object):
13+
"""A processor for log lines."""
14+
15+
def __enter__(self):
16+
pass
17+
18+
def process_line(self, log_line):
19+
pass
20+
21+
def __exit__(self, except_type, except_value, traceback):
22+
del except_type, except_value, traceback # unused
23+
pass
24+
25+
26+
class RayUpLineProcessor(LineProcessor):
27+
"""A processor for `ray up` log lines."""
28+
29+
class ProvisionStatus(enum.Enum):
30+
LAUNCH = 0
31+
RUNTIME_SETUP = 1
32+
33+
def __enter__(self):
34+
self.state = self.ProvisionStatus.LAUNCH
35+
self.status_display = rich.status.Status('[bold cyan]Launching')
36+
self.status_display.start()
37+
38+
def process_line(self, log_line):
39+
if ('Shared connection to' in log_line and
40+
self.state == self.ProvisionStatus.LAUNCH):
41+
self.status_display.stop()
42+
logger.info(f'{colorama.Fore.GREEN}Head node is up.'
43+
f'{colorama.Style.RESET_ALL}')
44+
self.status_display.start()
45+
self.status_display.update(
46+
'[bold cyan]Launching - Preparing Sky runtime')
47+
self.state = self.ProvisionStatus.RUNTIME_SETUP
48+
49+
def __exit__(self, except_type, except_value, traceback):
50+
del except_type, except_value, traceback # unused
51+
self.status_display.stop()

0 commit comments

Comments
 (0)