From 3fc38af8fc913f58845b1efae15dcabbfc75b919 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolai=20von=20K=C3=BCgelgen?= Date: Mon, 8 Apr 2024 14:19:12 +0200 Subject: [PATCH] feat: irods download refactoring and new generic sodar downloader with preset for dragen data (#226 ) (#227) --- cubi_tk/irods/check.py | 105 +---- cubi_tk/irods_common.py | 128 ++++++- cubi_tk/snappy/check_remote.py | 7 +- cubi_tk/snappy/pull_data_common.py | 65 +--- cubi_tk/snappy/pull_processed_data.py | 7 +- cubi_tk/snappy/pull_raw_data.py | 7 +- cubi_tk/sodar/__init__.py | 7 + cubi_tk/sodar/check_remote.py | 11 +- cubi_tk/sodar/pull_data_collection.py | 362 ++++++++++++++++++ ...ve_irods_collection.py => sodar_common.py} | 87 ++--- tests/test_irods_common.py | 48 ++- tests/test_sodar_pull_data_collection.py | 179 +++++++++ 12 files changed, 794 insertions(+), 219 deletions(-) create mode 100644 cubi_tk/sodar/pull_data_collection.py rename cubi_tk/{snappy/retrieve_irods_collection.py => sodar_common.py} (54%) create mode 100644 tests/test_sodar_pull_data_collection.py diff --git a/cubi_tk/irods/check.py b/cubi_tk/irods/check.py index 05305367..401d861e 100644 --- a/cubi_tk/irods/check.py +++ b/cubi_tk/irods/check.py @@ -1,68 +1,46 @@ """``cubi-tk irods check``: Check target iRODS collection (all md5 files? metadata md5 consistent? enough replicas?).""" import argparse -from contextlib import contextmanager import json from multiprocessing.pool import ThreadPool import os import re import typing -from irods.collection import iRODSCollection -from irods.column import Like from irods.data_object import iRODSDataObject -from irods.models import Collection as CollectionModel -from irods.models import DataObject as DataObjectModel -from irods.session import iRODSSession from logzero import logger import tqdm +from ..irods_common import DEFAULT_HASH_SCHEME, HASH_SCHEMES, iRODSRetrieveCollection + MIN_NUM_REPLICAS = 2 NUM_PARALLEL_TESTS = 4 NUM_DISPLAY_FILES = 20 -HASH_SCHEMES = { - "MD5": {"regex": re.compile(r"[0-9a-fA-F]{32}")}, - "SHA256": {"regex": re.compile(r"[0-9a-fA-F]{64}")}, -} -DEFAULT_HASH_SCHEME = "MD5" -class IrodsCheckCommand: +class IrodsCheckCommand(iRODSRetrieveCollection): """Implementation of iRDOS check command.""" command_name = "check" - def __init__(self, args): - #: Command line arguments. - self.args = args + def __init__(self, args, hash_scheme=DEFAULT_HASH_SCHEME, ask=False, irods_env_path=None): + """Constructor. - #: Path to iRODS environment file - self.irods_env_path = os.path.join( - os.path.expanduser("~"), ".irods", "irods_environment.json" - ) + :param args: argparse object with command line arguments. + :type args: argparse.Namespace - #: iRODS environment - self.irods_env = None + :param hash_scheme: iRODS hash scheme, default MD5. + :type hash_scheme: str, optional - def _init_irods(self): - """Connect to iRODS.""" - try: - return iRODSSession(irods_env_file=self.irods_env_path) - except Exception as e: - logger.error("iRODS connection failed: %s", self.get_irods_error(e)) - logger.error("Are you logged in? try 'iinit'") - raise + :param ask: Confirm with user before certain actions. + :type ask: bool, optional - @contextmanager - def _get_irods_sessions(self, count=NUM_PARALLEL_TESTS): - if count < 1: - count = 1 - irods_sessions = [self._init_irods() for _ in range(count)] - try: - yield irods_sessions - finally: - for irods in irods_sessions: - irods.cleanup() + :param irods_env_path: Path to irods_environment.json + :type irods_env_path: pathlib.Path, optional + """ + super.__init__(hash_scheme, ask, irods_env_path) + #: Command line arguments. + self.args = args @classmethod def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: @@ -100,40 +78,6 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: ) parser.add_argument("irods_path", help="Path to an iRODS collection.") - @classmethod - def get_irods_error(cls, e: Exception): - """Return logger friendly iRODS exception.""" - es = str(e) - return es if es != "None" else e.__class__.__name__ - - def get_data_objs( - self, root_coll: iRODSCollection - ) -> typing.Dict[ - str, typing.Union[typing.Dict[str, iRODSDataObject], typing.List[iRODSDataObject]] - ]: - """Get data objects recursively under the given iRODS path.""" - data_objs = dict(files=[], checksums={}) - ignore_schemes = [k.lower() for k in HASH_SCHEMES if k != self.args.hash_scheme.upper()] - irods_sess = root_coll.manager.sess - - query = irods_sess.query(DataObjectModel, CollectionModel).filter( - Like(CollectionModel.name, f"{root_coll.path}%") - ) - - for res in query: - # If the 'res' dict is not split into Colllection&Object the resulting iRODSDataObject is not fully functional, likely because a name/path/... attribute is overwritten somewhere - coll_res = {k: v for k, v in res.items() if k.icat_id >= 500} - obj_res = {k: v for k, v in res.items() if k.icat_id < 500} - coll = iRODSCollection(root_coll.manager, coll_res) - obj = iRODSDataObject(irods_sess.data_objects, parent=coll, results=[obj_res]) - - if obj.path.endswith("." + self.args.hash_scheme.lower()): - data_objs["checksums"][obj.path] = obj - elif obj.path.split(".")[-1] not in ignore_schemes: - data_objs["files"].append(obj) - - return data_objs - def check_args(self, _args): # Check hash scheme if _args.hash_scheme.upper() not in HASH_SCHEMES: @@ -170,18 +114,9 @@ def execute(self): logger.info("iRODS environment: %s", irods_env) # Connect to iRODS - with self._get_irods_sessions(self.args.num_parallel_tests) as irods_sessions: - try: - root_coll = irods_sessions[0].collections.get(self.args.irods_path) - logger.info( - "{} iRODS connection{} initialized".format( - len(irods_sessions), "s" if len(irods_sessions) != 1 else "" - ) - ) - except Exception as e: - logger.error("Failed to retrieve iRODS path: %s", self.get_irods_error(e)) - raise - + with self.session as irods_session: + root_coll = irods_session.collections.get(self.args.irods_path) + logger.info("1 iRODS connection initialized") # Get files and run checks logger.info("Querying for data objects") data_objs = self.get_data_objs(root_coll) diff --git a/cubi_tk/irods_common.py b/cubi_tk/irods_common.py index cab5f412..a05cea88 100644 --- a/cubi_tk/irods_common.py +++ b/cubi_tk/irods_common.py @@ -1,15 +1,23 @@ +from collections import defaultdict import getpass import os.path from pathlib import Path -from typing import Iterable +import re +from typing import Iterable, Union import attrs +from irods.collection import iRODSCollection +from irods.column import Like +from irods.data_object import iRODSDataObject from irods.exception import ( CAT_INVALID_AUTHENTICATION, CAT_INVALID_USER, CAT_PASSWORD_EXPIRED, PAM_AUTH_PASSWORD_FAILED, ) +from irods.keywords import FORCE_FLAG_KW +from irods.models import Collection as CollectionModel +from irods.models import DataObject as DataObjectModel from irods.password_obfuscation import encode from irods.session import NonAnonymousLoginWithoutPassword, iRODSSession import logzero @@ -20,6 +28,13 @@ formatter = logzero.LogFormatter(fmt="%(message)s") output_logger = logzero.setup_logger(formatter=formatter) +#: Default hash scheme. Although iRODS provides alternatives, the whole of `snappy` pipeline uses MD5. +HASH_SCHEMES = { + "MD5": {"regex": re.compile(r"[0-9a-fA-F]{32}")}, + "SHA256": {"regex": re.compile(r"[0-9a-fA-F]{64}")}, +} +DEFAULT_HASH_SCHEME = "MD5" + @attrs.frozen(auto_attribs=True) class TransferJob: @@ -219,7 +234,7 @@ def chksum(self): logger.error("Problem during iRODS checksumming.") logger.error(self.get_irods_error(e)) - def get(self): + def get(self, force_overwrite: bool = False): """Download files from SODAR.""" with self.session as session: self.__jobs = [ @@ -227,6 +242,10 @@ def get(self): for job in self.__jobs ] self.__total_bytes = sum([job.bytes for job in self.__jobs]) + + kw_options = {} + if force_overwrite: + kw_options = {FORCE_FLAG_KW: None} # Keyword has no value, just needs to be present # Double tqdm for currently transferred file info with ( tqdm( @@ -242,9 +261,15 @@ def get(self): file_log.set_description_str( f"File [{n + 1}/{len(self.__jobs)}]: {Path(job.path_local).name}" ) + if os.path.exists(job.path_local) and not force_overwrite: # pragma: no cover + logger.info( + f"{Path(job.path_local).name} already exists. Skipping, use force_overwrite to re-download." + ) + continue try: + Path(job.path_local).parent.mkdir(parents=True, exist_ok=True) with self.session as session: - session.data_objects.get(job.path_remote, job.path_local) + session.data_objects.get(job.path_remote, job.path_local, **kw_options) t.update(job.bytes) except FileNotFoundError: # pragma: no cover raise @@ -252,3 +277,100 @@ def get(self): logger.error(f"Problem during transfer of {job.path_remote}") logger.error(self.get_irods_error(e)) t.clear() + + +class iRODSRetrieveCollection(iRODSCommon): + """Class retrieves iRODS Collection associated with Assay""" + + def __init__( + self, hash_scheme: str = DEFAULT_HASH_SCHEME, ask: bool = False, irods_env_path: Path = None + ): + """Constructor. + + :param hash_scheme: iRODS hash scheme, default MD5. + :type hash_scheme: str, optional + + :param ask: Confirm with user before certain actions. + :type ask: bool, optional + + :param irods_env_path: Path to irods_environment.json + :type irods_env_path: pathlib.Path, optional + """ + super().__init__(ask, irods_env_path) + self.hash_scheme = hash_scheme + + def retrieve_irods_data_objects(self, irods_path: str) -> dict[str, list[iRODSDataObject]]: + """Retrieve data objects from iRODS. + + :param irods_path: iRODS path. + + :return: Returns dictionary representation of iRODS collection information. Key: File name in iRODS (str); + Value: list of iRODSDataObject (native python-irodsclient object). + """ + + # Connect to iRODS + with self.session as session: + try: + root_coll = session.collections.get(irods_path) + + # Get files and run checks + logger.info("Querying for data objects") + + if root_coll is not None: + irods_data_objs = self._irods_query(session, root_coll) + irods_obj_dict = self.parse_irods_collection(irods_data_objs) + return irods_obj_dict + + except Exception as e: # pragma: no cover + logger.error("Failed to retrieve iRODS path: %s", self.get_irods_error(e)) + raise + + return {} + + def _irods_query( + self, + session: iRODSSession, + root_coll: iRODSCollection, + ) -> dict[str, Union[dict[str, iRODSDataObject], list[iRODSDataObject]]]: + """Get data objects recursively under the given iRODS path.""" + + ignore_schemes = [k.lower() for k in HASH_SCHEMES if k != self.hash_scheme.upper()] + + query = session.query(DataObjectModel, CollectionModel).filter( + Like(CollectionModel.name, f"{root_coll.path}%") + ) + + data_objs = dict(files=[], checksums={}) + for res in query: + # If the 'res' dict is not split into Colllection&Object the resulting iRODSDataObject is not fully functional, + # likely because a name/path/... attribute is overwritten somewhere + magic_icat_id_separator = 500 + coll_res = {k: v for k, v in res.items() if k.icat_id >= magic_icat_id_separator} + obj_res = {k: v for k, v in res.items() if k.icat_id < magic_icat_id_separator} + coll = iRODSCollection(root_coll.manager, coll_res) + obj = iRODSDataObject(session.data_objects, parent=coll, results=[obj_res]) + + if obj.path.endswith("." + self.hash_scheme.lower()): + data_objs["checksums"][obj.path] = obj + elif obj.path.split(".")[-1] not in ignore_schemes: + data_objs["files"].append(obj) + + return data_objs + + @staticmethod + def parse_irods_collection(irods_data_objs) -> dict[str, list[iRODSDataObject]]: + """Parse iRODS collection + + :param irods_data_objs: iRODS collection. + :type irods_data_objs: dict + + :return: Returns dictionary representation of iRODS collection information. Key: File name in iRODS (str); + Value: list of iRODSDataObject (native python-irodsclient object). + """ + # Initialise variables + output_dict = defaultdict(list) + + for obj in irods_data_objs["files"]: + output_dict[obj.name].append(obj) + + return output_dict diff --git a/cubi_tk/snappy/check_remote.py b/cubi_tk/snappy/check_remote.py index 35dc6c67..c6bb5388 100644 --- a/cubi_tk/snappy/check_remote.py +++ b/cubi_tk/snappy/check_remote.py @@ -8,15 +8,14 @@ from collections import defaultdict import os from pathlib import Path -from types import SimpleNamespace import typing from biomedsheets import shortcuts from logzero import logger from ..common import load_toml_config +from ..sodar_common import RetrieveSodarCollection from .common import get_biomedsheet_path, load_sheet_tsv -from .retrieve_irods_collection import DEFAULT_HASH_SCHEME, RetrieveIrodsCollection class FindFilesCommon: @@ -684,9 +683,7 @@ def execute(self) -> typing.Optional[int]: variant_caller_class = VariantCallingChecker # Find all remote files (iRODS) - pseudo_args = SimpleNamespace(hash_scheme=DEFAULT_HASH_SCHEME) - library_remote_files_dict = RetrieveIrodsCollection( - pseudo_args, + library_remote_files_dict = RetrieveSodarCollection( self.args.sodar_url, self.args.sodar_api_token, self.args.assay_uuid, diff --git a/cubi_tk/snappy/pull_data_common.py b/cubi_tk/snappy/pull_data_common.py index 049141bb..d00a9fcd 100644 --- a/cubi_tk/snappy/pull_data_common.py +++ b/cubi_tk/snappy/pull_data_common.py @@ -1,37 +1,38 @@ from datetime import datetime -import os -from pathlib import Path -from types import SimpleNamespace +from typing import Dict, List -from irods.exception import OVERWRITE_WITHOUT_FORCE_FLAG -from irods.keywords import FORCE_FLAG_KW +from irods.data_object import iRODSDataObject from logzero import logger from sodar_cli import api -from ..irods.check import IrodsCheckCommand -from .retrieve_irods_collection import DEFAULT_HASH_SCHEME +from ..irods_common import TransferJob, iRODSTransfer #: Valid file extensions VALID_FILE_TYPES = ("bam", "vcf", "txt", "csv", "log") -class PullDataCommon(IrodsCheckCommand): +class PullDataCommon: """Implementation of common pull data methods.""" #: File type dictionary. Key: file type; Value: additional expected extensions (tuple). file_type_to_extensions_dict = None def __init__(self): - IrodsCheckCommand.__init__(self, args=SimpleNamespace(hash_scheme=DEFAULT_HASH_SCHEME)) - - def filter_irods_collection(self, identifiers, remote_files_dict, file_type): + pass + + def filter_irods_collection( + self, + identifiers: List[str], + remote_files_dict: Dict[str, List[iRODSDataObject]], + file_type: str, + ) -> Dict[str, List[iRODSDataObject]]: """Filter iRODS collection based on identifiers (sample id or library name) and file type/extension. :param identifiers: List of sample identifiers or library names. :type identifiers: list :param remote_files_dict: Dictionary with iRODS collection information. Key: file name as string (e.g., - 'P001-N1-DNA1-WES1.vcf.gz'); Value: iRODS data (``iRODSDataObject``). + 'P001-N1-DNA1-WES1.vcf.gz'); Value: list of iRODS data (``iRODSDataObject``). :type remote_files_dict: dict :param file_type: File type, example: 'bam' or 'vcf'. @@ -104,7 +105,8 @@ def get_assay_uuid(self, sodar_url, sodar_api_token, project_uuid): return _assay_uuid return None - def get_irods_files(self, irods_local_path_pairs, force_overwrite=False): + @staticmethod + def get_irods_files(irods_local_path_pairs, force_overwrite=False): """Get iRODS files Retrieves iRODS path and stores it locally. @@ -115,37 +117,12 @@ def get_irods_files(self, irods_local_path_pairs, force_overwrite=False): :param force_overwrite: Flag to indicate if local files should be overwritten. :type force_overwrite: bool """ - kw_options = {} - if force_overwrite: - kw_options = {FORCE_FLAG_KW: None} # Keyword has no value, just needs to be present - # Connect to iRODS - with self._get_irods_sessions(count=1) as irods_sessions: - try: - for pair in irods_local_path_pairs: - # Set variable - file_name = pair[0].split("/")[-1] - irods_path = pair[0] - local_out_path = pair[1] - # Create output directory if necessary - Path(local_out_path).parent.mkdir(parents=True, exist_ok=True) - # Get file - if os.path.exists(local_out_path) and not force_overwrite: - logger.info(f"{file_name} already exists. Force_overwrite to re-download.") - else: - logger.info(f"Retrieving '{file_name}' from: {irods_path}") - irods_sessions[0].data_objects.get(irods_path, local_out_path, **kw_options) - - except OVERWRITE_WITHOUT_FORCE_FLAG: - logger.error( - f"Failed to retrieve '{file_name}', it already exists in output directory: {local_out_path}" - ) - raise - - except Exception as e: - logger.error(f"Failed to retrieve iRODS path: {irods_path}") - logger.error(f"Attempted to copy file to directory: {local_out_path}") - logger.error(self.get_irods_error(e)) - raise + + transfer_jobs = [ + TransferJob(local_out_path, irods_path) + for irods_path, local_out_path in irods_local_path_pairs + ] + iRODSTransfer(transfer_jobs).get(force_overwrite) @staticmethod def report_no_file_found(available_files): diff --git a/cubi_tk/snappy/pull_processed_data.py b/cubi_tk/snappy/pull_processed_data.py index a2620486..5a1ed2be 100644 --- a/cubi_tk/snappy/pull_processed_data.py +++ b/cubi_tk/snappy/pull_processed_data.py @@ -7,16 +7,15 @@ """ import argparse import os -from types import SimpleNamespace import typing from logzero import logger from ..common import load_toml_config +from ..sodar_common import RetrieveSodarCollection from .common import get_biomedsheet_path, load_sheet_tsv from .parse_sample_sheet import ParseSampleSheet from .pull_data_common import PullDataCommon -from .retrieve_irods_collection import DEFAULT_HASH_SCHEME, RetrieveIrodsCollection #: Valid file extensions VALID_FILE_TYPES = ("bam", "vcf", "txt", "csv", "log") @@ -206,9 +205,7 @@ def execute(self) -> typing.Optional[int]: ) # Find all remote files (iRODS) - pseudo_args = SimpleNamespace(hash_scheme=DEFAULT_HASH_SCHEME) - remote_files_dict = RetrieveIrodsCollection( - pseudo_args, + remote_files_dict = RetrieveSodarCollection( self.args.sodar_url, self.args.sodar_api_token, self.args.assay_uuid, diff --git a/cubi_tk/snappy/pull_raw_data.py b/cubi_tk/snappy/pull_raw_data.py index a70e52a7..9aac86fd 100644 --- a/cubi_tk/snappy/pull_raw_data.py +++ b/cubi_tk/snappy/pull_raw_data.py @@ -10,7 +10,6 @@ from collections import defaultdict import os import pathlib -from types import SimpleNamespace import typing import attr @@ -18,10 +17,10 @@ import yaml from ..common import load_toml_config +from ..sodar_common import RetrieveSodarCollection from .common import find_snappy_root_dir, get_biomedsheet_path, load_sheet_tsv from .parse_sample_sheet import ParseSampleSheet from .pull_data_common import PullDataCommon -from .retrieve_irods_collection import DEFAULT_HASH_SCHEME, RetrieveIrodsCollection @attr.s(frozen=True, auto_attribs=True) @@ -189,9 +188,7 @@ def execute(self) -> typing.Optional[int]: ) # Find all remote files (iRODS) - pseudo_args = SimpleNamespace(hash_scheme=DEFAULT_HASH_SCHEME) - remote_files_dict = RetrieveIrodsCollection( - pseudo_args, + remote_files_dict = RetrieveSodarCollection( self.config.sodar_url, self.config.sodar_api_token, self.config.assay_uuid, diff --git a/cubi_tk/sodar/__init__.py b/cubi_tk/sodar/__init__.py index 42550452..5e01ffa6 100644 --- a/cubi_tk/sodar/__init__.py +++ b/cubi_tk/sodar/__init__.py @@ -30,6 +30,9 @@ ``add-ped`` Download sample sheet, add rows from PED file, and re-upload. +``pull-data-collection`` + Download data collection from iRODS. + ``pull-raw-data`` Download raw data from iRODS for samples from the sample sheet. @@ -62,6 +65,7 @@ from .lz_list import setup_argparse as setup_argparse_lz_list from .lz_move import setup_argparse as setup_argparse_lz_move from .lz_validate import setup_argparse as setup_argparse_lz_validate +from .pull_data_collection import setup_argparse as setup_argparse_pull_data_collection from .pull_raw_data import setup_argparse as setup_argparse_pull_raw_data from .upload_sheet import setup_argparse as setup_argparse_upload_sheet @@ -77,6 +81,9 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None: setup_argparse_upload_sheet( subparsers.add_parser("upload-sheet", help="Upload and replace ISA-tab") ) + setup_argparse_pull_data_collection( + subparsers.add_parser("pull-data-collection", help="Download data collections from iRODS") + ) setup_argparse_pull_raw_data( subparsers.add_parser("pull-raw-data", help="Download raw data from iRODS") ) diff --git a/cubi_tk/sodar/check_remote.py b/cubi_tk/sodar/check_remote.py index e926ffeb..583dfc82 100644 --- a/cubi_tk/sodar/check_remote.py +++ b/cubi_tk/sodar/check_remote.py @@ -14,7 +14,6 @@ from collections import defaultdict import os from pathlib import Path -from types import SimpleNamespace import typing import attr @@ -23,11 +22,7 @@ from ..common import compute_md5_checksum, load_toml_config from ..exceptions import FileMd5MismatchException from ..snappy.check_remote import Checker as SnappyChecker -from ..snappy.retrieve_irods_collection import RetrieveIrodsCollection - -# RetrieveIrodsCollection needs this in the namespace -# either define it explicitly or import it -DEFAULT_HASH_SCHEME = "MD5" +from ..sodar_common import RetrieveSodarCollection # Adapted from snappy.retrieve_irods_collection @@ -393,9 +388,7 @@ def execute(self) -> typing.Optional[int]: logger.info(" args: %s", self.args) # Find all remote files (iRODS) - pseudo_args = SimpleNamespace(hash_scheme=DEFAULT_HASH_SCHEME) - irodscollector = RetrieveIrodsCollection( - pseudo_args, + irodscollector = RetrieveSodarCollection( self.args.sodar_url, self.args.sodar_api_token, self.args.assay_uuid, diff --git a/cubi_tk/sodar/pull_data_collection.py b/cubi_tk/sodar/pull_data_collection.py new file mode 100644 index 00000000..84383dfb --- /dev/null +++ b/cubi_tk/sodar/pull_data_collection.py @@ -0,0 +1,362 @@ +"""``cubi-tk sodar pull-data-collection``: download raw data from iRODS via SODAR.""" + +import argparse +from collections import defaultdict +import os +from pathlib import PurePosixPath +import re + +from irods.data_object import iRODSDataObject +from logzero import logger +import pandas as pd + +from ..common import load_toml_config +from ..irods_common import TransferJob, iRODSTransfer +from ..snappy.pull_data_common import PullDataCommon +from ..sodar_common import RetrieveSodarCollection + + +class PullDataCollection(PullDataCommon): + """Implementation of pull data collection command.""" + + command_name = "pull-data-collection" + + presets = { + "dragen": [ + "**/*_FAM_dragen.fam.hard-filtered.vcf.gz" + "**/*_FAM_dragen.fam.hard-filtered.vcf.gz.tbi", + "**/*_FAM_dragen.fam.cnv.vcf.gz", + "**/*_FAM_dragen.fam.cnv.vcf.gz.tbi", + "**/*_FAM_dragen.fam.sv.vcf.gz", + "**/*_FAM_dragen.fam.sv.vcf.gz.tbi", + "**/*.qc-coverage*.csv", + "**/*.ped", + "**/*.mapping_metrics.csv", + ], + } + + def __init__(self, args): + """Constructor. + + :param args: argparse object with command line arguments. + :type args: argparse.Namespace + + :param sodar_config_path: Path to SODAR configuration file. + :type sodar_config_path: pathlib.Path + + :param irods_env_path: Path to irods_environment.json + :type irods_env_path: pathlib.Path, optional + """ + super().__init__() + #: Command line arguments. + self.args = args + + @classmethod + def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: + """Setup arguments for ``pull-data-collection`` command.""" + parser.add_argument( + "--hidden-cmd", dest="sodar_cmd", default=cls.run, help=argparse.SUPPRESS + ) + parser.add_argument( + "--sodar-url", + default=os.environ.get("SODAR_URL", "https://sodar.bihealth.org/"), + help="URL to SODAR, defaults to SODAR_URL environment variable or fallback to https://sodar.bihealth.org/", + ) + parser.add_argument( + "--sodar-api-token", + default=os.environ.get("SODAR_API_TOKEN", None), + help="Authentication token when talking to SODAR. Defaults to SODAR_API_TOKEN environment variable.", + ) + + parser.add_argument( + "--overwrite", + default=False, + action="store_true", + help="Allow overwriting of local files.", + ) + parser.add_argument( + "--assay-uuid", + default=None, + type=str, + help="UUID from Assay to check. Used to specify target while dealing with multi-assay projects.", + ) + + group_files = parser.add_mutually_exclusive_group(required=True) + + group_files.add_argument( + "-p", "--preset", help="Preset to use for file selection.", choices=cls.presets.keys() + ) + group_files.add_argument( + "-f", "--file-pattern", help="File pattern to use for file selection.", nargs="+" + ) + group_files.add_argument( + "-a", + "--all-files", + action="store_true", + help="Do not filter files, download everything.", + ) + + group_samples = parser.add_argument_group("Sample Filters") + group_samples.add_argument( + "--substring-match", + action="store_true", + help="Defined samples do not need to match collections exactly, a substring match is enough", + ) + + group_samples.add_argument( + "-s", + "--sample-list", + nargs="+", + help="Sample list used for filtering collections." + "Takes precedence over --tsv and --biomedsheet.", + ) + group_samples.add_argument( + "--biomedsheet", + help="Biomedsheet file for filtering collections. Sets tsv-column to 2 and " + "tsv-skip-rows to 12. Takes precedence over --tsv.", + ) + group_samples.add_argument( + "--tsv", help="Tabular file with sample names to use for filtering collections." + ) + group_samples.add_argument( + "--tsv-column", + default=1, + help="Column index for sample entries in tsv file. Default: 1.", + ) + group_samples.add_argument( + "--tsv-skip-rows", default=0, help="Number of header lines in tsv file. Default: 0." + ) + + parser.add_argument( + "-o", "--output-dir", help="Output directory. Default: $PWD", default=os.getcwd() + ) + parser.add_argument( + "--output-pattern", + default="{collection}/{subcollections}/{filename}", + help="Pattern for output files. Default: '{collection}/{subcollections}/{filename}'", + ) + parser.add_argument( + "--output-regex", + nargs=3, + action="append", + metavar=("FILEPART", "MATCH", "REPL"), + default=[], + type=str, + help="Regular expression to change parts from iRODS path for output pattern. " + "Syntax: 'collection|subcollections|filename' 'regex' 'replacement'" + "Can be given multiple times, Default: None", + ) + + parser.add_argument( + "project_uuid", + help="SODAR project UUID", + ) + + @classmethod + def run( + cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser + ) -> int | None: + """Entry point into the command.""" + return cls(args).execute() + + def check_args(self, args) -> int: + """Called for checking arguments.""" + res = 0 + + # If SODAR info not provided, fetch from user's toml file + toml_config = load_toml_config(args) + args.sodar_url = args.sodar_url or toml_config.get("global", {}).get("sodar_server_url") + args.sodar_api_token = args.sodar_api_token or toml_config.get("global", {}).get( + "sodar_api_token" + ) + + # Validate output directory path + if not os.path.exists(args.output_dir): + try: + os.makedirs(args.output_dir) + except Exception as e: + logger.error( + f"Output directory path does not exist and cannot be created: {args.output_dir}" + ) + logger.debug(e) + res = 1 + elif not os.access(args.output_dir, os.W_OK): + logger.error( + f"Output directory path either does not exist or it is not writable: {args.base_path}" + ) + res = 1 + + return res + + def execute(self) -> int | None: + """Execute the transfer.""" + res = self.check_args(self.args) + if res: # pragma: nocover + return res + + logger.info("Starting cubi-tk sodar pull-data-collection") + logger.info(" args: %s", self.args) + + # Get list of sample ids + if self.args.sample_list: + samples = set(self.args.sample_list) + elif self.args.biomedsheet: + samples = self.parse_sample_tsv(self.args.biomedsheet, sample_col=2, skip_rows=12) + elif self.args.tsv: + samples = self.parse_sample_tsv( + self.args.tsv, sample_col=self.args.tsv_column, skip_rows=self.args.tsv_skip_rows + ) + else: + samples = None + + # Find all remote files (iRODS) + filesearcher = RetrieveSodarCollection( + self.args.sodar_url, + self.args.sodar_api_token, + self.args.assay_uuid, + self.args.project_uuid, + ) + + remote_files_dict = filesearcher.perform() + assay_path = filesearcher.get_assay_irods_path(self.args.assay_uuid) + + if self.args.all_files: + file_patterns = [] + elif self.args.preset: + file_patterns = self.presets[self.args.preset] + else: # self.args.file_pattern + file_patterns = self.args.file_pattern + + filtered_remote_files_dict = self.filter_irods_file_list( + remote_files_dict, assay_path, file_patterns, samples, self.args.substring_match + ) + + if len(filtered_remote_files_dict) == 0: + self.report_no_file_found(available_files=[*remote_files_dict]) + return 0 + + # Pair iRODS path with output path + transfer_jobs = self.build_download_jobs( + remote_files_dict=filtered_remote_files_dict, + assay_path=assay_path, + ) + + # Retrieve files from iRODS + iRODSTransfer(transfer_jobs).get(self.args.overwrite) + + logger.info("All done. Have a nice day!") + return 0 + + @staticmethod + def parse_sample_tsv(tsv_path, sample_col=1, skip_rows=0, skip_comments=True) -> set[str]: + extra_args = {"comment": "#"} if skip_comments else {} + df = pd.read_csv(tsv_path, sep="\t", skiprows=skip_rows, **extra_args) + try: + samples = set(df.iloc[:, sample_col - 1]) + except IndexError: + logger.error( + f"Error extracting column no. {sample_col} from {tsv_path}, only {len(df.columns)} where detected." + ) + raise + + return samples + + @staticmethod + def filter_irods_file_list( + remote_files_dict: dict[str, list[iRODSDataObject]], + common_assay_path: str, + file_patterns: list[str], + samples: set[str], + substring_match: bool = False, + ) -> dict[str, list[iRODSDataObject]]: + """Filter iRODS collection based on identifiers (sample id or library name) and file type/extension. + + :param remote_files_dict: Dictionary with iRODS collection information. Key: file name as string (e.g., + 'P001-N1-DNA1-WES1.vcf.gz'); Value: iRODS data (``iRODSDataObject``). + :type remote_files_dict: dict + + :param common_assay_path: Path common to all files. If provided, files in this path will be stripped. + :type common_assay_path: str + + :param file_patterns: List of file patterns to use for file selection. Ignored if empty. + :type file_patterns: list of strings + + :param samples: List of collection identifiers or substrings. Ignored if empty. + :type samples: list + + :param substring_match: Fiter by extact collection matches or by substring matches. + :type substring_match: bool + + :return: Returns dictionary: Key: sample (collection name [str]); Value: list of iRODS objects. + """ + # Initialise variables + filtered_dict = defaultdict(list) + + # Iterate + for filename, irodsobjs in remote_files_dict.items(): + for irodsobj in irodsobjs: + # Path needs to be stripped down to collections (=remove assay part & upwards) + path = PurePosixPath(irodsobj.path).relative_to(PurePosixPath(common_assay_path)) + + collection = path.parts[0] + + # Check if collection (=1st element of striped path) matches any of the samples + if samples and not substring_match: + sample_match = any(s == collection for s in samples) + elif samples: + sample_match = any(s in collection for s in samples) + else: + sample_match = True + + if not sample_match: + continue + + if file_patterns: + file_pattern_match = any(p for p in file_patterns if path.match(p)) + else: + file_pattern_match = True + + if not file_pattern_match: + continue + + filtered_dict[collection].append(irodsobj) + + return filtered_dict + + def build_download_jobs( + self, remote_files_dict: dict[str, list[iRODSDataObject]], assay_path: str + ) -> list[TransferJob]: + """Build list of download jobs for iRODS files.""" + # Initiate output + output_list = [] + # Iterate over iRODS objects + for collection, irods_objects in remote_files_dict.items(): + for irods_obj in irods_objects: + relpath = PurePosixPath(irods_obj.path).relative_to(PurePosixPath(assay_path)) + coll, *subcolls, filename = relpath.parts + assert coll == collection + out_parts = { + "collection": coll, + "subcollections": "/".join(subcolls), + "filename": filename, + } + # apply regexes + for filepart, m_pat, r_pat in self.args.output_regex: + out_parts[filepart] = re.sub(m_pat, r_pat, out_parts[filepart]) + + job = TransferJob( + os.path.join( + self.args.output_dir, self.args.output_pattern.format(**out_parts) + ), + irods_obj.path, + # # Unclear if this is available or not + # irods_obj.size, + ) + output_list.append(job) + + return output_list + + +def setup_argparse(parser: argparse.ArgumentParser) -> None: + """Setup argument parser for ``cubi-tk org-raw check``.""" + return PullDataCollection.setup_argparse(parser) diff --git a/cubi_tk/snappy/retrieve_irods_collection.py b/cubi_tk/sodar_common.py similarity index 54% rename from cubi_tk/snappy/retrieve_irods_collection.py rename to cubi_tk/sodar_common.py index 57afed86..240a9803 100644 --- a/cubi_tk/snappy/retrieve_irods_collection.py +++ b/cubi_tk/sodar_common.py @@ -1,22 +1,24 @@ -"""Contains classes and methods used to retrieve iRODS collections from SODAR. -""" -from collections import defaultdict -import typing +from pathlib import Path +from typing import Dict, List from irods.data_object import iRODSDataObject from logzero import logger from sodar_cli import api -from ..irods.check import IrodsCheckCommand +from .irods_common import DEFAULT_HASH_SCHEME, iRODSRetrieveCollection -#: Default hash scheme. Although iRODS provides alternatives, the whole of `snappy` pipeline uses MD5. -DEFAULT_HASH_SCHEME = "MD5" - -class RetrieveIrodsCollection(IrodsCheckCommand): - """Class retrieves iRODS Collection associated with Assay""" - - def __init__(self, args, sodar_url, sodar_api_token, assay_uuid, project_uuid): +class RetrieveSodarCollection(iRODSRetrieveCollection): + def __init__( + self, + sodar_url, + sodar_api_token, + assay_uuid, + project_uuid, + hash_scheme: str = DEFAULT_HASH_SCHEME, + ask: bool = False, + irods_env_path: Path = None, + ): """Constructor. :param sodar_url: SODAR url. @@ -30,14 +32,23 @@ def __init__(self, args, sodar_url, sodar_api_token, assay_uuid, project_uuid): :param project_uuid: Project UUID. :type project_uuid: str + + :param hash_scheme: iRODS hash scheme, default MD5. + :type hash_scheme: str, optional + + :param ask: Confirm with user before certain actions. + :type ask: bool, optional + + :param irods_env_path: Path to irods_environment.json + :type irods_env_path: pathlib.Path, optional """ - IrodsCheckCommand.__init__(self, args=args) + super().__init__(hash_scheme, ask, irods_env_path) self.sodar_url = sodar_url self.sodar_api_token = sodar_api_token self.assay_uuid = assay_uuid self.project_uuid = project_uuid - def perform(self) -> typing.Dict[str, typing.List[iRODSDataObject]]: + def perform(self) -> Dict[str, List[iRODSDataObject]]: """Perform class routines.""" logger.info("Starting remote files search ...") @@ -97,51 +108,3 @@ def multi_assay_warning(assays): f"Project contains multiple Assays, will only consider UUID '{assays[0]}'.\n" f"All available UUIDs:\n{multi_assay_str}" ) - - def retrieve_irods_data_objects( - self, irods_path: str - ) -> typing.Dict[str, typing.List[iRODSDataObject]]: - """Retrieve data objects from iRODS. - - :param irods_path: iRODS path. - - :return: Returns dictionary representation of iRODS collection information. Key: File name in iRODS (str); - Value: list of iRODSDataObject (native python-irodsclient object). - """ - - # Connect to iRODS - with self._get_irods_sessions(1) as irods_sessions: - try: - root_coll = irods_sessions[0].collections.get(irods_path) - - # Get files and run checks - logger.info("Querying for data objects") - - if root_coll is not None: - irods_data_objs = self.get_data_objs(root_coll) - irods_obj_dict = self.parse_irods_collection(irods_data_objs) - return irods_obj_dict - - except Exception as e: - logger.error("Failed to retrieve iRODS path: %s", self.get_irods_error(e)) - raise - - return {} - - @staticmethod - def parse_irods_collection(irods_data_objs) -> typing.Dict[str, typing.List[iRODSDataObject]]: - """Parse iRODS collection - - :param irods_collection: iRODS collection. - :type irods_collection: dict - - :return: Returns dictionary representation of iRODS collection information. Key: File name in iRODS (str); - Value: list of iRODSDataObject (native python-irodsclient object). - """ - # Initialise variables - output_dict = defaultdict(list) - - for obj in irods_data_objs["files"]: - output_dict[obj.name].append(obj) - - return output_dict diff --git a/tests/test_irods_common.py b/tests/test_irods_common.py index 156ae241..6398ac2b 100644 --- a/tests/test_irods_common.py +++ b/tests/test_irods_common.py @@ -4,7 +4,12 @@ import irods.exception import pytest -from cubi_tk.irods_common import TransferJob, iRODSCommon, iRODSTransfer +from cubi_tk.irods_common import ( + TransferJob, + iRODSCommon, + iRODSRetrieveCollection, + iRODSTransfer, +) def test_transfer_job_bytes(fs): @@ -167,3 +172,44 @@ def test_irods_transfer_get(mocksession, jobs): # download mockget.assert_any_call(job.path_remote, job.path_local) assert itransfer.size == 222 + + +# Test iRODSRetrieveCollection ######### + + +# This tests `retrieve_irods_data_objects` and by extension `parse_irods_collection` +# A test for _irods_query would require mocking `session.query` results in a +# way that allows creation of iRODSDataObject instances from those results +@patch("cubi_tk.irods_common.iRODSCommon._init_irods") +@patch("cubi_tk.irods_common.iRODSRetrieveCollection._irods_query") +def test_irods_retrieve_data_objects(mockquery, mocksession): + # Possible alternative to MagicMocks here: + # create a fake iRODSDataObject class with a path attribute + mockobj1 = MagicMock() + mockobj1.path = "/root/coll1/file1.vcf.gz" + mockobj1.name = "file1.vcf.gz" + mockobj2 = MagicMock() + mockobj2.path = "/root/coll2/file2.vcf.gz" + mockobj2.name = "file2.vcf.gz" + mockobj3 = MagicMock() + mockobj3.path = "/root/coll1/subcol/file1.vcf.gz" + mockobj3.name = "file1.vcf.gz" + + mockcksum = MagicMock() + + mockquery.return_value = { + "files": [mockobj1, mockobj2, mockobj3], + "checksums": { + "/root/coll1/file1.vcf.gz": mockcksum, + "/root/coll2/file2.vcf.gz": mockcksum, + "/root/coll1/subcol/file1.vcf.gz": mockcksum, + }, + } + + mocksession.collections.get.return_value = "path" + + data_objs = iRODSRetrieveCollection().retrieve_irods_data_objects("/fake/path") + + expected_data_objs = {"file1.vcf.gz": [mockobj1, mockobj3], "file2.vcf.gz": [mockobj2]} + + assert data_objs == expected_data_objs diff --git a/tests/test_sodar_pull_data_collection.py b/tests/test_sodar_pull_data_collection.py new file mode 100644 index 00000000..6cade9ca --- /dev/null +++ b/tests/test_sodar_pull_data_collection.py @@ -0,0 +1,179 @@ +from copy import deepcopy +import pathlib +import re +from unittest.mock import MagicMock + +import pytest + +from cubi_tk.__main__ import setup_argparse +from cubi_tk.irods_common import TransferJob +from cubi_tk.sodar.pull_data_collection import PullDataCollection + + +class MockDataObject: + def __init__(self, path): + self.path = path + + def __eq__(self, other): + return self.path == other.path + + def __repr__(self): + return f"MockDataObject(path={self.path})" + + +@pytest.fixture +def filtered_data_objects(): + return { + "coll1-N1-DNA1": [ + MockDataObject(path="/irods/project/coll1-N1-DNA1/subcol1/file1.vcf.gz"), + MockDataObject(path="/irods/project/coll1-N1-DNA1/subcol2/file1.vcf.gz"), + MockDataObject(path="/irods/project/coll1-N1-DNA1/subcol1/miscFile.txt"), + ], + "coll2-N1-DNA1": [ + MockDataObject(path="/irods/project/coll2-N1-DNA1/subcol1/file2.vcf.gz"), + MockDataObject(path="/irods/project/coll2-N1-DNA1/subcol1/file2.bam"), + MockDataObject(path="/irods/project/coll2-N1-DNA1/subcol1/miscFile.txt"), + ], + } + + +def test_filter_irods_collection(filtered_data_objects): + fake_irods_data_dict = { + "file1.vcf.gz": [ + MockDataObject(path="/irods/project/coll1-N1-DNA1/subcol1/file1.vcf.gz"), + MockDataObject(path="/irods/project/coll1-N1-DNA1/subcol2/file1.vcf.gz"), + ], + "file2.vcf.gz": [ + MockDataObject(path="/irods/project/coll2-N1-DNA1/subcol1/file2.vcf.gz"), + ], + "file2.bam": [ + MockDataObject(path="/irods/project/coll2-N1-DNA1/subcol1/file2.bam"), + ], + "miscFile.txt": [ + MockDataObject(path="/irods/project/coll1-N1-DNA1/subcol1/miscFile.txt"), + MockDataObject(path="/irods/project/coll2-N1-DNA1/subcol1/miscFile.txt"), + ], + } + + kwarg_list = [ + # No filters at all -> all files + {"file_patterns": [], "samples": [], "substring_match": False}, + # Test filepattern filter works + {"file_patterns": ["*.vcf.gz"], "samples": [], "substring_match": False}, + # Test file pattern with mutiple patterns, also **/*.X & *.Y + {"file_patterns": ["*.vcf.gz", "**/*.txt"], "samples": [], "substring_match": False}, + # Test Sample/Collection filter works + {"file_patterns": [], "samples": ["coll1-N1-DNA1"], "substring_match": False}, + # Test substring matching works + {"file_patterns": [], "samples": ["coll1"], "substring_match": True}, + ] + + expected_results = [ + deepcopy(filtered_data_objects), + { + k: [v for v in l if v.path.endswith("vcf.gz")] + for k, l in deepcopy(filtered_data_objects).items() + }, + { + k: [v for v in l if not v.path.endswith("bam")] + for k, l in deepcopy(filtered_data_objects).items() + }, + {k: l for k, l in deepcopy(filtered_data_objects).items() if k == "coll1-N1-DNA1"}, + {k: l for k, l in deepcopy(filtered_data_objects).items() if k == "coll1-N1-DNA1"}, + ] + + for kwargs, expected in zip(kwarg_list, expected_results): + result = PullDataCollection.filter_irods_file_list( + fake_irods_data_dict, "/irods/project", **kwargs + ) + assert result == expected + + +def test_build_download_jobs(filtered_data_objects): + mockargs = MagicMock() + mockargs.output_dir = "/path/to/output" + mockargs.output_regex = [] # ['', '', ''] + mockargs.output_pattern = "{collection}/{subcollections}/{filename}" + + testinstance = PullDataCollection(mockargs) + + expected_out = [ + TransferJob( + path_remote=obj.path, path_local=obj.path.replace("/irods/project", "/path/to/output") + ) + for k, l in filtered_data_objects.items() + for obj in l + ] + out = testinstance.build_download_jobs(filtered_data_objects, "/irods/project") + assert out == expected_out + + # Test with different output pattern + mockargs.output_pattern = "{collection}/{filename}" + expected_out = [ + TransferJob( + path_remote=obj.path, + path_local=re.sub( + "/subcol[12]", "", obj.path.replace("/irods/project", "/path/to/output") + ), + ) + for k, l in filtered_data_objects.items() + for obj in l + ] + out = testinstance.build_download_jobs(filtered_data_objects, "/irods/project") + assert out == expected_out + + # Test with regex + mockargs.output_regex = [ + ["subcollections", "subcol", "subcollection"], + ["collection", "-N1-DNA1", ""], + ] + mockargs.output_pattern = "{collection}/{subcollections}/{filename}" + expected_out = [ + TransferJob( + path_remote=obj.path, + path_local=obj.path.replace("/irods/project", "/path/to/output") + .replace("subcol", "subcollection") + .replace("-N1-DNA1", ""), + ) + for k, l in filtered_data_objects.items() + for obj in l + ] + out = testinstance.build_download_jobs(filtered_data_objects, "/irods/project") + assert out == expected_out + + +def test_parse_samplesheet(): + # Test on Biomedsheet + samples = PullDataCollection.parse_sample_tsv( + pathlib.Path(__file__).resolve().parent / "data" / "pull_sheets" / "sheet.tsv", + sample_col=2, + skip_rows=12, + ) + assert samples == {"index", "mother", "father"} + + +def test_run_sodar_pull_data_collection_help(capsys): + """Test ``cubi-tk sodar pull-data-collection --help``""" + parser, _subparsers = setup_argparse() + with pytest.raises(SystemExit) as e: + parser.parse_args(["sodar", "pull-data-collection", "--help"]) + + assert e.value.code == 0 + + res = capsys.readouterr() + assert res.out + assert not res.err + + +def test_run_sodar_pull_data_collection_nothing(capsys): + """Test ``cubi-tk sodar pull-data-collection``""" + parser, _subparsers = setup_argparse() + + with pytest.raises(SystemExit) as e: + parser.parse_args(["sodar", "pull-data-collection"]) + + assert e.value.code == 2 + + res = capsys.readouterr() + assert not res.out + assert res.err