Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update sodar_cli & python-irods client for base functionality #263

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies = [
"icdiff>=2.0.7",
"logzero>=1.7.0",
"pandas>=2.2.3",
"python-irodsclient==1.1.8",
"python-irodsclient==3.0.0",
"pyyaml>=6.0.2",
"requests>=2.32.3",
"retrying>=1.3.4",
Expand All @@ -32,7 +32,7 @@ dependencies = [
"vcfpy >=0.13.8",
"altamisa @ git+https://github.com/bihealth/altamisa.git@817dc491ff819e4c80686082bf3e5f602f1ac14c",
"biomedsheets @ git+https://github.com/bihealth/biomedsheets@4e0a8484850c39d1511036c3fe29ec0b4f9271f8",
"sodar-cli @ git+https://github.com/bihealth/sodar-cli@a62505ff9b1365f150bce54c9b2b5e638f245f86",
"sodar-cli @ git+https://github.com/bihealth/sodar-cli@93a2a590df6c03abcd3f433a37ceb792aba5e7af",
]

[project.license]
Expand Down
1 change: 1 addition & 0 deletions src/cubi_tk/irods_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
import re
from typing import Iterable, Union
import typing

import attrs
from irods.collection import iRODSCollection
Expand Down
137 changes: 119 additions & 18 deletions src/cubi_tk/sea_snap/itransfer_results.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,79 @@
"""``cubi-tk sea-snap itransfer-ngs-mapping``: transfer ngs_mapping results into iRODS landing zone."""

import argparse
import attr
from ctypes import c_ulonglong
from multiprocessing import Value
from multiprocessing.pool import ThreadPool
import os
import pathlib
import re
from subprocess import SubprocessError, check_output
from subprocess import STDOUT, SubprocessError, check_call, check_output
from retrying import retry
import sys
import typing

from logzero import logger
import tqdm

from ..common import check_irods_icommands, sizeof_fmt
from ..snappy.itransfer_common import SnappyItransferCommandBase, TransferJob
from ..snappy.itransfer_common import SnappyItransferCommandBase

#: Default number of parallel transfers.
DEFAULT_NUM_TRANSFERS = 8

@attr.s(frozen=True, auto_attribs=True)
class TransferJob:
"""Encodes a transfer job from the local file system to the remote iRODS collection."""

#: Source path.
path_src: str

#: Destination path.
path_dest: str

#: Number of bytes to transfer.
bytes: int

command: typing.Optional[str] = None

def to_oneline(self):
return "%s -> %s (%s) [%s]" % (self.path_src, self.path_dest, self.bytes, self.command)


@retry(wait_fixed=1000, stop_max_attempt_number=5)
def _wait_until_ils_succeeds(path):
check_output(["ils", path], stderr=STDOUT)


@retry(wait_fixed=1000, stop_max_attempt_number=5)
def irsync_transfer(job: TransferJob, counter: Value, t: tqdm.tqdm):
"""Perform one piece of work and update the global counter."""
mkdir_argv = ["imkdir", "-p", os.path.dirname(job.path_dest)]
logger.debug("Creating directory when necessary: %s", " ".join(mkdir_argv))
try:
check_output(mkdir_argv)
except SubprocessError as e: # pragma: nocover
logger.error("Problem executing imkdir: %s (probably retrying)", e)
raise

_wait_until_ils_succeeds(os.path.dirname(job.path_dest))

irsync_argv = ["irsync", "-a", "-K", job.path_src, "i:%s" % job.path_dest]
logger.debug("Transferring file: %s", " ".join(irsync_argv))
try:
check_output(irsync_argv)
except SubprocessError as e: # pragma: nocover
logger.error("Problem executing irsync: %s (probably retrying)", e)
raise

with counter.get_lock():
counter.value = job.bytes
try:
t.update(counter.value)
except TypeError:
pass # swallow, pyfakefs and multiprocessing don't lik each other


class SeasnapItransferMappingResultsCommand(SnappyItransferCommandBase):
"""Implementation of sea-snap itransfer command for ngs_mapping results."""
Expand Down Expand Up @@ -171,31 +225,78 @@ def execute(self) -> typing.Optional[int]:
logger.info("All done")
return None

def _execute_md5_files_fix(
self, transfer_jobs: typing.Tuple[TransferJob, ...]
) -> typing.Tuple[TransferJob, ...]:
"""Create missing MD5 files."""
ok_jobs = []
todo_jobs = []
for job in transfer_jobs:
if not os.path.exists(job.path_src):
todo_jobs.append(job)
else:
ok_jobs.append(job)

total_bytes = sum([os.path.getsize(j.path_src[: -len(".md5")]) for j in todo_jobs])
logger.info(
"Computing MD5 sums for %s files of %s with up to %d processes",
len(todo_jobs),
sizeof_fmt(total_bytes),
self.args.num_parallel_transfers,
)
logger.info("Missing MD5 files:\n%s", "\n".join(map(lambda j: j.path_src, todo_jobs)))
counter = Value(c_ulonglong, 0)
with tqdm.tqdm(total=total_bytes, unit="B", unit_scale=True) as t:
if self.args.num_parallel_transfers == 0: # pragma: nocover
for job in todo_jobs:
compute_md5sum(job, counter, t)
else:
pool = ThreadPool(processes=self.args.num_parallel_transfers)
for job in todo_jobs:
pool.apply_async(compute_md5sum, args=(job, counter, t))
pool.close()
pool.join()

# Finally, determine file sizes after done.
done_jobs = [
TransferJob(
path_src=j.path_src,
path_dest=j.path_dest,
bytes=os.path.getsize(j.path_src),
command=j.command,
)
for j in todo_jobs
]
return tuple(sorted(done_jobs + ok_jobs))


def setup_argparse(parser: argparse.ArgumentParser) -> None:
"""Setup argument parser for ``cubi-tk sea-snap itransfer-results``."""
return SeasnapItransferMappingResultsCommand.setup_argparse(parser)


def irsync_transfer(job: TransferJob, counter: Value, t: tqdm.tqdm):
"""Perform one piece of work and update the global counter."""
if job.command:
commands = job.command.split(os.linesep)
else:
msg = "Command attribute of TransferJob not set."
logger.error(msg)
raise ValueError(msg)

for cmd in commands:
logger.debug("Running command: %s", cmd)
def compute_md5sum(job: TransferJob, counter: Value, t: tqdm.tqdm) -> None:
"""Compute MD5 sum with ``md5sum`` command."""
dirname = os.path.dirname(job.path_src)
filename = os.path.basename(job.path_src)[: -len(".md5")]
path_md5 = job.path_src

md5sum_argv = ["md5sum", filename]
logger.debug("Computing MD5sum %s > %s", " ".join(md5sum_argv), filename + ".md5")
try:
with open(path_md5, "wt") as md5f:
check_call(md5sum_argv, cwd=dirname, stdout=md5f)
except SubprocessError as e: # pragma: nocover
logger.error("Problem executing md5sum: %s", e)
logger.info("Removing file after error: %s", path_md5)
try:
check_output(cmd, shell=True)
except SubprocessError as e: # pragma: nocover
logger.error("Problem executing irsync: %e", e)
raise
os.remove(path_md5)
except OSError as e_rm: # pragma: nocover
logger.error("Could not remove file: %s", e_rm)
raise e

with counter.get_lock():
counter.value = job.bytes
counter.value = os.path.getsize(job.path_src[: -len(".md5")])
try:
t.update(counter.value)
except TypeError:
Expand Down
Loading
Loading