Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolai-vKuegelgen committed Feb 17, 2025
1 parent c5abb90 commit 838d1bb
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions src/cubi_tk/snappy/itransfer_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ def __init__(self, args):
def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
"""Setup common arguments for itransfer commands."""

# FIXME: outsource the Sodar related (as well as assay & desitnation) to the irods transfer command & sodar API classes
group_sodar = parser.add_argument_group("SODAR-related")
# FIXME: the (non-env-var?) defaults here should NOT take precendence over the toml file entries
group_sodar.add_argument(
"--sodar-url",
default=os.environ.get("SODAR_URL", "https://sodar.bihealth.org/"),
Expand All @@ -126,11 +128,12 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--hidden-cmd", dest="snappy_cmd", default=cls.run, help=argparse.SUPPRESS
)
## Not supported anymore/yet
#FIXME: replace this with num_irods_threads
# the irods python client should automatically figure out how many threads to use, so this optional
# parser.add_argument(
# "--num-parallel-transfers",
# type=int,
# default=DEFAULT_NUM_TRANSFERS,
# type=int|None,
# default=None,
# help="Number of parallel transfers, defaults to %s" % DEFAULT_NUM_TRANSFERS,
# )
parser.add_argument(
Expand Down Expand Up @@ -580,7 +583,8 @@ def get_latest_landing_zone(self, project_uuid, assay_uuid=None):
return lz_uuid, lz_irods_path

def _execute_md5_files_fix(
self, transfer_jobs: tuple[TransferJob, ...]
self, transfer_jobs: tuple[TransferJob, ...],
parallel_jobs: int = 8
) -> tuple[TransferJob, ...]:
"""Create missing MD5 files."""
ok_jobs = []
Expand All @@ -598,14 +602,14 @@ def _execute_md5_files_fix(
sizeof_fmt(total_bytes),
self.args.num_parallel_transfers,
)
logger.info("Missing MD5 files:\n%s", "\n".join(map(lambda j: j.path_src, todo_jobs)))
logger.info("Missing MD5 files:\n%s", "\n".join(map(lambda j: j.path_local, todo_jobs)))
counter = Value(c_ulonglong, 0)
with tqdm.tqdm(total=total_bytes, unit="B", unit_scale=True) as t:
if self.args.num_parallel_transfers == 0: # pragma: nocover
if parallel_jobs == 0: # pragma: nocover
for job in todo_jobs:
compute_md5sum(job, counter, t)
else:
pool = ThreadPool(processes=self.args.num_parallel_transfers)
pool = ThreadPool(processes=parallel_jobs)
for job in todo_jobs:
pool.apply_async(compute_md5sum, args=(job, counter, t))
pool.close()
Expand Down Expand Up @@ -653,7 +657,7 @@ def execute(self) -> typing.Optional[int]:
logger.info("Libraries in sheet:\n%s", "\n".join(sorted(library_names)))

lz_uuid, transfer_jobs = self.build_jobs(library_names)
logger.debug("Transfer jobs:\n%s", "\n".join(map(lambda x: x.to_oneline(), transfer_jobs)))
# logger.debug("Transfer jobs:\n%s", "\n".join(map(lambda x: x.to_oneline(), transfer_jobs)))

if self.fix_md5_files:
transfer_jobs = self._execute_md5_files_fix(transfer_jobs)
Expand All @@ -671,7 +675,7 @@ def execute(self) -> typing.Optional[int]:
# logger.info("Aborting at your request.")
# sys.exit(0)
# This does support "num_parallel_transfers" (but it may autimatically use multiple transfer threads?)
itransfer.put(recursive=True, sync=self.args.remote_overwrite)
itransfer.put(recursive=True, sync=self.args.overwrite_remote)
logger.info("File transfer complete.")


Expand Down Expand Up @@ -778,9 +782,9 @@ class FileWithSize:

def compute_md5sum(job: TransferJob, counter: Value, t: tqdm.tqdm) -> None:
"""Compute MD5 sum with ``md5sum`` command."""
dirname = os.path.dirname(job.path_src)
filename = os.path.basename(job.path_src)[: -len(".md5")]
path_md5 = job.path_src
dirname = os.path.dirname(job.path_local)
filename = os.path.basename(job.path_local)[: -len(".md5")]
path_md5 = job.path_local

md5sum_argv = ["md5sum", filename]
logger.debug("Computing MD5sum %s > %s", " ".join(md5sum_argv), filename + ".md5")
Expand All @@ -797,7 +801,7 @@ def compute_md5sum(job: TransferJob, counter: Value, t: tqdm.tqdm) -> None:
raise e

with counter.get_lock():
counter.value = os.path.getsize(job.path_src[: -len(".md5")])
counter.value = os.path.getsize(job.path_local[: -len(".md5")])
try:
t.update(counter.value)
except TypeError:
Expand Down

0 comments on commit 838d1bb

Please sign in to comment.