Skip to content

Commit 397a300

Browse files
authored
Merge pull request #53 from CABLE-LSM/50-reconcile-upload-methods
Reconciled upload methods and fixed tests. Fixes #50
2 parents f9c604f + 60e035c commit 397a300

File tree

5 files changed

+111
-81
lines changed

5 files changed

+111
-81
lines changed

meorg_client/cli.py

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,13 @@ def list_endpoints():
109109

110110
@click.command("upload")
111111
@click.argument("file_path", nargs=-1)
112+
@click.option("-n", default=1, help="Number of threads for parallel uploads.")
112113
@click.option(
113114
"--attach_to",
114115
default=None,
115116
help="Supply a model output id to immediately attach the file to.",
116117
)
117-
def file_upload(file_path, attach_to=None):
118+
def file_upload(file_path, n: int = 1, attach_to=None):
118119
"""
119120
Upload a file to the server.
120121
@@ -125,41 +126,23 @@ def file_upload(file_path, attach_to=None):
125126
client = _get_client()
126127

127128
# Upload the file, get the job ID
128-
response = _call(client.upload_files, files=list(file_path), attach_to=attach_to)
129+
responses = _call(
130+
client.upload_files,
131+
files=list(file_path),
132+
n=n,
133+
attach_to=attach_to,
134+
progress=True,
135+
)
136+
137+
for response in responses:
138+
139+
# For singular case
140+
if n == 1:
141+
response = response[0]
129142

130-
# Different logic if we are attaching to a model output immediately
131-
if not attach_to:
132143
files = response.get("data").get("files")
133144
for f in files:
134145
click.echo(f.get("file"))
135-
else:
136-
click.echo("SUCCESS")
137-
138-
139-
@click.command("upload_parallel")
140-
@click.argument("file_paths", nargs=-1)
141-
@click.option(
142-
"-n", default=2, help="Number of simultaneous parallel uploads (default=2)."
143-
)
144-
@click.option(
145-
"--attach_to",
146-
default=None,
147-
help="Supply a model output id to immediately attach the file to.",
148-
)
149-
def file_upload_parallel(file_paths: tuple, n: int = 2, attach_to: str = None):
150-
"""Upload files in parallel.
151-
152-
Parameters
153-
----------
154-
file_paths : tuple
155-
Sequence of file paths.
156-
n : int, optional
157-
Number of parallel uploads, by default 2
158-
"""
159-
client = _get_client()
160-
responses = _call(client.upload_files_parallel, files=list(file_paths), n=n)
161-
for response in responses:
162-
click.echo(response.get("data").get("files")[0].get("file"))
163146

164147

165148
@click.command("list")
@@ -303,7 +286,7 @@ def cli_analysis():
303286
# Add file commands
304287
cli_file.add_command(file_list)
305288
cli_file.add_command(file_upload)
306-
cli_file.add_command(file_upload_parallel)
289+
# cli_file.add_command(file_upload_parallel)
307290
cli_file.add_command(file_attach)
308291

309292
# Add endpoint commands

meorg_client/client.py

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import meorg_client.parallel as meop
1414
import mimetypes as mt
1515
from pathlib import Path
16+
from tqdm import tqdm
1617

1718

1819
class Client:
@@ -217,8 +218,12 @@ def logout(self):
217218
self.headers.pop("X-User-Id", None)
218219
self.headers.pop("X-Auth-Token", None)
219220

220-
def upload_files_parallel(
221-
self, files: Union[str, Path, list], n: int = 2, attach_to: str = None
221+
def _upload_files_parallel(
222+
self,
223+
files: Union[str, Path, list],
224+
n: int = 2,
225+
attach_to: str = None,
226+
progress=True,
222227
):
223228
"""Upload files in parallel.
224229
@@ -240,21 +245,61 @@ def upload_files_parallel(
240245
# Ensure the object is actually iterable
241246
files = mu.ensure_list(files)
242247

243-
# Single file provided, don't bother starting the pool
244-
if len(files) == 1:
245-
return self.upload_files(files)
246-
247248
# Do the parallel upload
248249
responses = None
249250
responses = meop.parallelise(
250-
self.upload_files, n, files=files, attach_to=attach_to
251+
self._upload_file, n, files=files, attach_to=attach_to, progress=progress
251252
)
252253

253254
return responses
254255

255256
def upload_files(
256257
self,
257-
files: Union[str, Path],
258+
files: Union[str, Path, list],
259+
n: int = 1,
260+
attach_to: str = None,
261+
progress=True,
262+
) -> list:
263+
"""Upload files.
264+
265+
Parameters
266+
----------
267+
files : Union[str, Path, list]
268+
A filepath, or a list of filepaths.
269+
n : int, optional
270+
Number of threads to parallelise over, by default 1
271+
attach_to : str, optional
272+
Model output ID to immediately attach to, by default None
273+
274+
Returns
275+
-------
276+
list
277+
List of dicts
278+
"""
279+
280+
# Ensure the files are actually a list
281+
files = mu.ensure_list(files)
282+
283+
# Just because someone will try to assign 0 threads...
284+
if n >= 1 == False:
285+
raise ValueError("Number of threads must be greater than or equal to 1.")
286+
287+
# Sequential upload
288+
responses = list()
289+
if n == 1:
290+
for fp in tqdm(files, total=len(files)):
291+
response = self._upload_file(fp, attach_to=attach_to)
292+
responses.append(response)
293+
else:
294+
responses = self._upload_files_parallel(
295+
files, n=n, attach_to=attach_to, progress=progress
296+
)
297+
298+
return mu.ensure_list(responses)
299+
300+
def _upload_file(
301+
self,
302+
files: Union[str, Path, list],
258303
attach_to: str = None,
259304
) -> Union[dict, requests.Response]:
260305
"""Upload a file.
@@ -324,7 +369,7 @@ def upload_files(
324369
attach_to, files=mu.get_uploaded_file_ids(response)
325370
)
326371

327-
return response
372+
return mu.ensure_list(response)
328373

329374
def list_files(self, id: str) -> Union[dict, requests.Response]:
330375
"""Get a list of model outputs.
@@ -458,22 +503,3 @@ def success(self) -> bool:
458503
True if successful, False otherwise.
459504
"""
460505
return self.last_response.status_code in mcc.HTTP_STATUS_SUCCESS_RANGE
461-
462-
def is_initialised(self, dev=False) -> bool:
463-
"""Check if the client is initialised.
464-
465-
NOTE: This does not check the login actually works.
466-
467-
Parameters
468-
----------
469-
dev : bool, optional
470-
Use dev credentials, by default False
471-
472-
Returns
473-
-------
474-
bool
475-
True if initialised, False otherwise.
476-
"""
477-
cred_filename = "credentials.json" if not dev else "credentials-dev.json"
478-
cred_filepath = mu.get_user_data_filepath(cred_filename)
479-
return cred_filepath.exists()

meorg_client/parallel.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pandas as pd
44
import multiprocessing as mp
5+
from tqdm import tqdm
56

67

78
def _execute(mp_args: tuple):
@@ -31,7 +32,7 @@ def _convert_kwargs(**kwargs):
3132
return pd.DataFrame(kwargs).to_dict("records")
3233

3334

34-
def parallelise(func: callable, num_threads: int, **kwargs):
35+
def parallelise(func: callable, num_threads: int, progress=True, **kwargs):
3536
"""Execute `func` in parallel over `num_threads`.
3637
3738
Parameters
@@ -56,11 +57,22 @@ def parallelise(func: callable, num_threads: int, **kwargs):
5657
mp_args = [[func, mp_arg] for mp_arg in mp_args]
5758

5859
# Start with empty results
59-
results = None
60+
results = list()
6061

6162
# Establish a pool of workers (blocking)
6263
with mp.Pool(processes=num_threads) as pool:
63-
results = pool.map(_execute, mp_args)
64+
65+
if progress:
66+
67+
with tqdm(total=len(mp_args)) as pbar:
68+
for result in pool.map(_execute, mp_args):
69+
results.append(result[0])
70+
pbar.update()
71+
72+
else:
73+
74+
for result in pool.map(_execute, mp_args):
75+
results.append(result[0])
6476

6577
# Return the results
6678
return results

meorg_client/tests/test_cli.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def test_file_upload(runner: CliRunner, test_filepath: str):
4545
assert result.exit_code == 0
4646

4747
# Add the job_id to the store for the next test
48-
store.set("file_id", result.output.strip())
48+
store.set("file_id", result.stdout.split()[-1].strip())
4949

5050
# Let it wait for a short while, allow the server to transfer to object store.
5151
time.sleep(5)
@@ -65,16 +65,6 @@ def test_file_multiple(runner: CliRunner, test_filepath: str):
6565
time.sleep(5)
6666

6767

68-
def test_file_upload_parallel(runner: CliRunner, test_filepath: str):
69-
"""Test file-upload via CLI."""
70-
71-
# Upload a tiny test file
72-
result = runner.invoke(
73-
cli.file_upload_parallel, [test_filepath, test_filepath, "-n 2"]
74-
)
75-
assert result.exit_code == 0
76-
77-
7868
def test_file_list(runner):
7969
"""Test file-list via CLI."""
8070
result = runner.invoke(cli.file_list, [store.get("model_output_id")])
@@ -100,12 +90,20 @@ def test_file_upload_with_attach(runner, test_filepath):
10090
assert result.exit_code == 0
10191

10292

93+
def test_file_upload_parallel(runner: CliRunner, test_filepath: str):
94+
"""Test file-upload via CLI."""
95+
96+
# Upload a tiny test file
97+
result = runner.invoke(cli.file_upload, [test_filepath, test_filepath, "-n", "2"])
98+
assert result.exit_code == 0
99+
100+
103101
def test_file_upload_parallel_with_attach(runner, test_filepath):
104102
"""Test file upload with attachment via CLI."""
105103
model_output_id = store.get("model_output_id")
106104
result = runner.invoke(
107-
cli.file_upload_parallel,
108-
[test_filepath, test_filepath, "--attach_to", model_output_id],
105+
cli.file_upload,
106+
[test_filepath, test_filepath, "-n", "2", "--attach_to", model_output_id],
109107
)
110108
assert result.exit_code == 0
111109

meorg_client/tests/test_client.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def test_list_endpoints(client: Client):
7777
def test_upload_file(client: Client, test_filepath: str):
7878
"""Test the uploading of a file."""
7979
# Upload the file
80-
response = client.upload_files(test_filepath)
80+
response = client.upload_files(test_filepath)[0]
8181

8282
# Make sure it worked
8383
assert client.success()
@@ -108,7 +108,7 @@ def test_file_list(client: Client):
108108

109109
def test_attach_files_to_model_output(client: Client):
110110
# Get the file id from the job id
111-
file_id = store.get("file_upload").get("data").get("files")[0].get("file")
111+
file_id = store.get("file_upload")[0].get("data").get("files")[0].get("file")
112112

113113
# Attach it to the model output
114114
_ = client.attach_files_to_model_output(client._model_output_id, [file_id])
@@ -165,7 +165,18 @@ def test_upload_files_with_attach(client: Client):
165165
def test_upload_file_parallel(client: Client, test_filepath: str):
166166
"""Test the uploading of a file."""
167167
# Upload the file
168-
responses = client.upload_files_parallel([test_filepath, test_filepath], n=2)
168+
responses = client.upload_files([test_filepath, test_filepath], n=2, progress=True)
169+
170+
# Make sure it worked
171+
assert all(
172+
[response.get("data").get("files")[0].get("file") for response in responses]
173+
)
174+
175+
176+
def test_upload_file_parallel_no_progress(client: Client, test_filepath: str):
177+
"""Test the uploading of a file."""
178+
# Upload the file
179+
responses = client.upload_files([test_filepath, test_filepath], n=2, progress=False)
169180

170181
# Make sure it worked
171182
assert all(
@@ -176,7 +187,7 @@ def test_upload_file_parallel(client: Client, test_filepath: str):
176187
def test_upload_file_parallel_with_attach(client: Client, test_filepath: str):
177188
"""Test the uploading of a file with a model output ID to attach."""
178189
# Upload the file
179-
responses = client.upload_files_parallel(
190+
responses = client.upload_files(
180191
[test_filepath, test_filepath], n=2, attach_to=client._model_output_id
181192
)
182193

0 commit comments

Comments
 (0)