Skip to content

Commit 30476aa

Browse files
[Storage] Support multiple files in Storage (skypilot-org#1311)
* Set rename_dir_lim for gcsfuse * Add support for list of sources for Storage * fix demo yaml * tests * lint * lint * test * add validation * address zhwu comments * add error on basename conflicts * use gsutil cp -n instead of gsutil rsync * lint * fix name * parallelize gsutil rsync * parallelize aws s3 rsync * lint * address comments * refactor * lint * address comments * update schema
1 parent 50abaee commit 30476aa

File tree

7 files changed

+469
-156
lines changed

7 files changed

+469
-156
lines changed

docs/source/reference/storage.rst

+22-4
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,23 @@ and storage mounting:
155155
name: romil-output-bucket
156156
mode: MOUNT
157157
158+
# *** Uploading multiple files to the same Storage object ***
159+
#
160+
# The source field in a storage object can also be a list of local paths.
161+
# This is useful when multiple files or directories need to be uploaded to the
162+
# same bucket.
163+
#
164+
# Note: The basenames of each path in the source list are copied recursively
165+
# to the root of the bucket. Thus, If the source list contains a directory,
166+
# the entire directory is copied to the root of the bucket. For instance,
167+
# in this example, the contents of ~/datasets are copied to
168+
# s3://sky-multisource-storage/datasets/. ~/mydir/myfile.txt will appear
169+
# at s3://sky-multisource-storage/myfile.txt.
170+
/datasets-multisource-storage:
171+
name: sky-multisource-storage2 # Make sure this name is unique or you own this bucket
172+
source: [~/mydir/myfile.txt, ~/datasets]
173+
174+
158175
run: |
159176
pwd
160177
ls -la /
@@ -255,10 +272,11 @@ Storage YAML reference
255272

256273
sky.Storage.source: str
257274
The source attribute specifies the local path that must be made available
258-
in the storage object. It can either be a local path, in which case data
259-
is uploaded to the cloud to an appropriate object store (s3 or gcs), or it
260-
can be a remote path (s3://, gs://), in which case it is copied or mounted
261-
directly (see mode flag below).
275+
in the storage object. It can either be a local path or a list of local
276+
paths or it can be a remote path (s3://, gs://).
277+
If the source is local, data is uploaded to the cloud to an appropriate
278+
object store (s3 or gcs). If the path is remote, the data is copied
279+
or mounted directly (see mode flag below).
262280

263281
sky.Storage.store: str; either of 's3' or 'gcs'
264282
If you wish to force sky.Storage to be backed by a specific cloud object

examples/storage_demo.yaml

+18-7
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88

99
name: storage-demo
1010

11-
resources:
12-
cloud: aws
13-
1411
###############################################
1512
# SkyPilot storage #
1613
###############################################
@@ -40,7 +37,7 @@ resources:
4037
# 3. You want to have a write-able path to directly write files to S3 buckets -
4138
# specify a name (to create a bucket if it doesn't exist) and set the mode
4239
# to MOUNT. This is useful for writing code outputs, such as checkpoints or
43-
# logs directly to a S3 bucket.
40+
# logs directly to a S3/GCS bucket.
4441
#
4542
# 4. You want to have a shared file-system across workers running on different
4643
# nodes - specify a name (to create a bucket if it doesn't exist) and set
@@ -94,7 +91,7 @@ file_mounts:
9491
# When the VM is initialized, the contents of the bucket are copied to
9592
# /datasets-storage. If the bucket already exists, it is fetched and re-used.
9693
/datasets-storage:
97-
name: sky-dataset-romilzz # Make sure this name is unique or you own this bucket
94+
name: sky-dataset-mybucket # Make sure this name is unique or you own this bucket
9895
source: ~/datasets
9996
store: s3 # Could be either of [s3, gcs]; default: None
10097
persistent: True # Defaults to True, can be set to false.
@@ -110,7 +107,7 @@ file_mounts:
110107
# other storage mounts using the same bucket with MOUNT mode. Note that the
111108
# source is synced with the remote bucket everytime this task is run.
112109
/dataset-storage-mount:
113-
name: sky-dataset-romilzz
110+
name: sky-dataset-mybucket
114111
source: ~/datasets
115112
mode: MOUNT
116113

@@ -137,9 +134,23 @@ file_mounts:
137134
# this approach can also be used to create a shared filesystem across workers.
138135
# See examples/storage/pingpong.yaml for an example.
139136
/outputs-mount:
140-
name: romil-output-bucketzz
137+
name: sky-output-bucket # Make sure this name is unique or you own this bucket
141138
mode: MOUNT
142139

140+
# *** Uploading multiple files to the same Storage object ***
141+
#
142+
# The source field in a storage object can also be a list of local paths.
143+
# This is useful when multiple files or directories need to be uploaded to the
144+
# same bucket.
145+
#
146+
# Note: If the source list contains a directory, the entire directory is copied
147+
# to the root of the bucket. For instance, in this example, the contents of
148+
# ~/datasets are copied to s3://sky-multisource-storage/datasets/. ~/mydir/myfile.txt
149+
# will appear at s3://sky-multisource-storage/myfile.txt.
150+
/datasets-multisource-storage:
151+
name: sky-multisource-storage # Make sure this name is unique or you own this bucket
152+
source: [~/mydir/myfile.txt, ~/datasets]
153+
143154
run: |
144155
pwd
145156
ls -la /

sky/data/data_utils.py

+125-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
"""Miscellaneous Utils for Sky Data
22
"""
3-
from typing import Any, Tuple
3+
from multiprocessing import pool
4+
import os
5+
import subprocess
6+
from pathlib import Path
7+
from typing import Any, Callable, Dict, List, Optional, Tuple
48
import urllib.parse
59

10+
from sky import exceptions
11+
from sky import sky_logging
612
from sky.adaptors import aws, gcp
13+
from sky.utils import ux_utils
714

815
Client = Any
916

17+
logger = sky_logging.init_logger(__name__)
18+
1019

1120
def split_s3_path(s3_path: str) -> Tuple[str, str]:
1221
"""Splits S3 Path into Bucket name and Relative Path to Bucket
@@ -69,3 +78,118 @@ def is_cloud_store_url(url):
6978
result = urllib.parse.urlsplit(url)
7079
# '' means non-cloud URLs.
7180
return result.netloc
81+
82+
83+
def _group_files_by_dir(
84+
source_list: List[str]) -> Tuple[Dict[str, List[str]], List[str]]:
85+
"""Groups a list of paths based on their directory
86+
87+
Given a list of paths, generates a dict of {dir_name: List[file_name]}
88+
which groups files with same dir, and a list of dirs in the source_list.
89+
90+
This is used to optimize uploads by reducing the number of calls to rsync.
91+
E.g., ['a/b/c.txt', 'a/b/d.txt', 'a/e.txt'] will be grouped into
92+
{'a/b': ['c.txt', 'd.txt'], 'a': ['e.txt']}, and these three files can be
93+
uploaded in two rsync calls instead of three.
94+
95+
Args:
96+
source_list: List[str]; List of paths to group
97+
"""
98+
grouped_files = {}
99+
dirs = []
100+
for source in source_list:
101+
source = os.path.abspath(os.path.expanduser(source))
102+
if os.path.isdir(source):
103+
dirs.append(source)
104+
else:
105+
base_path = os.path.dirname(source)
106+
file_name = os.path.basename(source)
107+
if base_path not in grouped_files:
108+
grouped_files[base_path] = []
109+
grouped_files[base_path].append(file_name)
110+
return grouped_files, dirs
111+
112+
113+
def parallel_upload(source_path_list: List[Path],
114+
filesync_command_generator: Callable[[str, List[str]], str],
115+
dirsync_command_generator: Callable[[str, str], str],
116+
bucket_name: str,
117+
access_denied_message: str,
118+
create_dirs: bool = False,
119+
max_concurrent_uploads: Optional[int] = None) -> None:
120+
"""Helper function to run parallel uploads for a list of paths.
121+
122+
Used by S3Store and GCSStore to run rsync commands in parallel by
123+
providing appropriate command generators.
124+
125+
Args:
126+
source_path_list: List of paths to local files or directories
127+
filesync_command_generator: Callable that generates rsync command
128+
for a list of files belonging to the same dir.
129+
dirsync_command_generator: Callable that generates rsync command
130+
for a directory.
131+
access_denied_message: Message to intercept from the underlying
132+
upload utility when permissions are insufficient. Used in
133+
exception handling.
134+
create_dirs: If the local_path is a directory and this is set to
135+
False, the contents of the directory are directly uploaded to
136+
root of the bucket. If the local_path is a directory and this is
137+
set to True, the directory is created in the bucket root and
138+
contents are uploaded to it.
139+
max_concurrent_uploads: Maximum number of concurrent threads to use
140+
to upload files.
141+
"""
142+
# Generate gsutil rsync command for files and dirs
143+
commands = []
144+
grouped_files, dirs = _group_files_by_dir(source_path_list)
145+
# Generate file upload commands
146+
for dir_path, file_names in grouped_files.items():
147+
sync_command = filesync_command_generator(dir_path, file_names)
148+
commands.append(sync_command)
149+
# Generate dir upload commands
150+
for dir_path in dirs:
151+
if create_dirs:
152+
dest_dir_name = os.path.basename(dir_path)
153+
else:
154+
dest_dir_name = ''
155+
sync_command = dirsync_command_generator(dir_path, dest_dir_name)
156+
commands.append(sync_command)
157+
158+
# Run commands in parallel
159+
with pool.ThreadPool(processes=max_concurrent_uploads) as p:
160+
p.starmap(
161+
run_upload_cli,
162+
zip(commands, [access_denied_message] * len(commands),
163+
[bucket_name] * len(commands)))
164+
165+
166+
def run_upload_cli(command: str, access_denied_message: str, bucket_name: str):
167+
# TODO(zhwu): Use log_lib.run_with_log() and redirect the output
168+
# to a log file.
169+
with subprocess.Popen(command,
170+
stderr=subprocess.PIPE,
171+
stdout=subprocess.DEVNULL,
172+
shell=True) as process:
173+
stderr = []
174+
while True:
175+
line = process.stderr.readline()
176+
if not line:
177+
break
178+
str_line = line.decode('utf-8')
179+
stderr.append(str_line)
180+
if access_denied_message in str_line:
181+
process.kill()
182+
with ux_utils.print_exception_no_traceback():
183+
raise PermissionError(
184+
'Failed to upload files to '
185+
'the remote bucket. The bucket does not have '
186+
'write permissions. It is possible that '
187+
'the bucket is public.')
188+
returncode = process.wait()
189+
if returncode != 0:
190+
stderr = '\n'.join(stderr)
191+
with ux_utils.print_exception_no_traceback():
192+
logger.error(stderr)
193+
raise exceptions.StorageUploadError(
194+
f'Upload to bucket failed for store {bucket_name}. '
195+
'Please check the logs.')

0 commit comments

Comments
 (0)