diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py deleted file mode 100644 index 6748b90d..00000000 --- a/cubi_tk/irods_utils.py +++ /dev/null @@ -1,160 +0,0 @@ -import getpass -import os.path -from pathlib import Path -import sys -from typing import Set - -import attr -from irods.exception import CAT_INVALID_AUTHENTICATION, PAM_AUTH_PASSWORD_FAILED -from irods.password_obfuscation import encode -from irods.session import iRODSSession -import logzero -from logzero import logger -from tqdm import tqdm - -# no-frills logger -formatter = logzero.LogFormatter(fmt="%(message)s") -output_logger = logzero.setup_logger(formatter=formatter) - - -@attr.s(frozen=True, auto_attribs=True) -class TransferJob: - """Encodes a transfer job from the local file system to the remote iRODS collection.""" - - #: Source path. - path_src: str - - #: Destination path. - path_dest: str - - #: Number of bytes to transfer. - bytes: int - - -def get_irods_error(e: Exception): - """Return logger friendly iRODS exception.""" - es = str(e) - return es if es and es != "None" else e.__class__.__name__ - - -def init_irods(irods_env_path: Path, ask: bool = False) -> iRODSSession: - """Connect to iRODS.""" - irods_auth_path = irods_env_path.parent.joinpath(".irodsA") - if irods_auth_path.exists(): - try: - session = iRODSSession(irods_env_file=irods_env_path) - session.server_version # check for outdated .irodsA file - session.connection_timeout = 300 - return session - except Exception as e: # pragma: no cover - logger.error(f"iRODS connection failed: {get_irods_error(e)}") - pass - finally: - session.cleanup() - - # No valid .irodsA file. Query user for password. - logger.info("No valid iRODS authentication file found.") - attempts = 0 - while attempts < 3: - try: - session = iRODSSession( - irods_env_file=irods_env_path, - password=getpass.getpass(prompt="Please enter SODAR password:"), - ) - session.server_version # check for exceptions - break - except PAM_AUTH_PASSWORD_FAILED: # pragma: no cover - if attempts < 2: - logger.warning("Wrong password. Please try again.") - attempts += 1 - continue - else: - logger.error("iRODS connection failed.") - sys.exit(1) - except Exception as e: # pragma: no cover - logger.error(f"iRODS connection failed: {get_irods_error(e)}") - sys.exit(1) - finally: - session.cleanup() - - if ask and input("Save iRODS session for passwordless operation? [y/N] ").lower().startswith( - "y" - ): - save_irods_token(session) # pragma: no cover - elif not ask: - save_irods_token(session) - - return session - - -def save_irods_token(session: iRODSSession, irods_env_path=None): - """Retrieve PAM temp auth token 'obfuscate' it and save to disk.""" - if not irods_env_path: - irods_auth_path = Path.home().joinpath(".irods", ".irodsA") - else: - irods_auth_path = Path(irods_env_path).parent.joinpath(".irodsA") - - irods_auth_path.parent.mkdir(parents=True, exist_ok=True) - - try: - token = session.pam_pw_negotiated - except CAT_INVALID_AUTHENTICATION: # pragma: no cover - raise - - if isinstance(token, list) and token: - irods_auth_path.write_text(encode(token[0])) - irods_auth_path.chmod(0o600) - - -class iRODSTransfer: - """ - Transfer files to iRODS. - - Attributes: - session -- initialised iRODSSession - jobs -- a tuple of TransferJob objects - """ - - def __init__(self, session: iRODSSession, jobs: Set[TransferJob]): - self.session = session - self.jobs = jobs - self.total_bytes = sum([job.bytes for job in self.jobs]) - self.destinations = [job.path_dest for job in self.jobs] - - def put(self): - # Double tqdm for currently transferred file info - # TODO: add more parenthesis after python 3.10 - with tqdm( - total=self.total_bytes, - unit="B", - unit_scale=True, - unit_divisor=1024, - position=1, - ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: - for job in self.jobs: - file_log.set_description_str(f"Current file: {job.path_src}") - try: - self.session.data_objects.put(job.path_src, job.path_dest) - t.update(job.bytes) - except Exception as e: # pragma: no cover - logger.error(f"Problem during transfer of {job.path_src}") - logger.error(get_irods_error(e)) - sys.exit(1) - finally: - self.session.cleanup() - t.clear() - - def chksum(self): - common_prefix = os.path.commonpath(self.destinations) - for job in self.jobs: - if not job.path_src.endswith(".md5"): - output_logger.info(Path(job.path_dest).relative_to(common_prefix)) - try: - data_object = self.session.data_objects.get(job.path_dest) - if not data_object.checksum: - data_object.chksum() - except Exception as e: # pragma: no cover - logger.error("Problem during iRODS checksumming.") - logger.error(get_irods_error(e)) - finally: - self.session.cleanup() diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 0d708c8d..b942fbb1 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -6,15 +6,13 @@ import sys import typing -import attr -from irods.exception import DataObjectDoesNotExist +import attrs import logzero from logzero import logger from sodar_cli import api -from cubi_tk.irods_utils import TransferJob, get_irods_error, init_irods, iRODSTransfer - from ..common import compute_md5_checksum, is_uuid, load_toml_config, sizeof_fmt +from cubi_tk.irods_common import TransferJob, iRODSCommon, iRODSTransfer # for testing logger.propagate = True @@ -24,13 +22,13 @@ output_logger = logzero.setup_logger(formatter=formatter) -@attr.s(frozen=True, auto_attribs=True) +@attrs.frozen(auto_attribs=True) class Config: """Configuration for the ingest command.""" - config: str = attr.field(default=None) - sodar_server_url: str = attr.field(default=None) - sodar_api_token: str = attr.field(default=None, repr=lambda value: "***") # type: ignore + config: str = attrs.field(default=None) + sodar_server_url: str = attrs.field(default=None) + sodar_api_token: str = attrs.field(default=None, repr=lambda value: "***") # type: ignore class SodarIngest: @@ -81,6 +79,13 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: action="store_true", help="Recursively match files in subdirectories. Creates iRODS sub-collections to match directory structure.", ) + parser.add_argument( + "-s", + "--sync", + default=False, + action="store_true", + help="Skip upload of files already present in remote collection.", + ) parser.add_argument( "-e", "--exclude", @@ -132,7 +137,7 @@ def execute(self): ) except Exception as e: # pragma: no cover logger.error("Failed to retrieve landing zone information.") - logger.error(e) + logger.exception(e) sys.exit(1) # TODO: Replace with status_locked check once implemented in sodar_cli @@ -145,28 +150,30 @@ def execute(self): else: self.lz_irods_path = self.args.destination # pragma: no cover - # Build file list and add missing md5 files + # Build file list source_paths = self.build_file_list() if len(source_paths) == 0: logger.info("Nothing to do. Quitting.") sys.exit(0) # Initiate iRODS session - irods_session = init_irods(self.irods_env_path, ask=not self.args.yes) + irods_session = iRODSCommon().session # Query target collection logger.info("Querying landing zone collections…") collections = [] try: - coll = irods_session.collections.get(self.lz_irods_path) - for c in coll.subcollections: - collections.append(c.name) + with irods_session as i: + coll = i.collections.get(self.lz_irods_path) + for c in coll.subcollections: + collections.append(c.name) except Exception as e: # pragma: no cover - logger.error(f"Failed to query landing zone collections: {get_irods_error(e)}") + logger.error( + f"Failed to query landing zone collections: {iRODSCommon().get_irods_error(e)}" + ) sys.exit(1) - finally: - irods_session.cleanup() + # Query user for target sub-collection if self.args.collection is None: user_input = "" input_valid = False @@ -190,64 +197,16 @@ def execute(self): logger.error("Selected target collection does not exist in landing zone.") sys.exit(1) - # Create sub-collections for folders - if self.args.recursive: - dirs = sorted( - {p["ipath"].parent for p in source_paths if not p["ipath"].parent == Path(".")} - ) - - # Filter out existing sub-collections - try: - dirs = [ - d - for d in dirs - if not irods_session.collections.exists( - f"{self.lz_irods_path}/{self.target_coll}/{str(d)}" - ) - ] - except Exception as e: # pragma: no cover - logger.error("Error checking for sub-collections.") - logger.error(e) - sys.exit(1) - finally: - irods_session.cleanup() - - if dirs: - logger.info("Planning to create the following sub-collections:") - for d in dirs: - output_logger.info(f"{self.target_coll}/{str(d)}") - if not self.args.yes: - if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover - logger.info("Aborting at your request.") - sys.exit(0) - - for d in dirs: - coll_name = f"{self.lz_irods_path}/{self.target_coll}/{str(d)}" - try: - irods_session.collections.create(coll_name) - except Exception as e: # pragma: no cover - logger.error("Error creating sub-collection.") - logger.error(e) - sys.exit(1) - finally: - irods_session.cleanup() - logger.info("Sub-collections created.") - - # Build transfer jobs - jobs = self.build_jobs(source_paths, irods_session) + # Build transfer jobs and add missing md5 files + jobs = self.build_jobs(source_paths) + jobs = sorted(jobs, key=lambda x: x.path_local) # Final go from user & transfer - if len(jobs) == 0: - logger.info("Nothing to do. Quitting.") - sys.exit(0) - - jobs = sorted(jobs, key=lambda x: x.path_src) - itransfer = iRODSTransfer(irods_session, jobs) - total_bytes = itransfer.total_bytes + itransfer = iRODSTransfer(jobs, ask=not self.args.yes) logger.info("Planning to transfer the following files:") for job in jobs: - output_logger.info(job.path_src) - logger.info(f"With a total size of {sizeof_fmt(total_bytes)}") + output_logger.info(job.path_local) + logger.info(f"With a total size of {sizeof_fmt(itransfer.size)}") logger.info("Into this iRODS collection:") output_logger.info(f"{self.lz_irods_path}/{self.target_coll}/") @@ -256,7 +215,7 @@ def execute(self): logger.info("Aborting at your request.") sys.exit(0) - itransfer.put() + itransfer.put(recursive=self.args.recursive, sync=self.args.sync) logger.info("File transfer complete.") # Compute server-side checksums @@ -264,7 +223,7 @@ def execute(self): logger.info("Computing server-side checksums.") itransfer.chksum() - def build_file_list(self): + def build_file_list(self) -> typing.List[typing.Dict[Path, Path]]: """ Build list of source files to transfer. iRODS paths are relative to target collection. @@ -296,55 +255,37 @@ def build_file_list(self): output_paths.append({"spath": src, "ipath": Path(src.name)}) return output_paths - def build_jobs(self, source_paths, irods_session) -> typing.Set[TransferJob]: + def build_jobs(self, source_paths: typing.Iterable[Path]) -> typing.Set[TransferJob]: """Build file transfer jobs.""" transfer_jobs = [] for p in source_paths: - path_dest = f"{self.lz_irods_path}/{self.target_coll}/{str(p['ipath'])}" + path_remote = f"{self.lz_irods_path}/{self.target_coll}/{str(p['ipath'])}" md5_path = p["spath"].parent / (p["spath"].name + ".md5") if md5_path.exists(): - with md5_path.open() as f: - md5sum = f.readline()[:32] logger.info(f"Found md5 hash on disk for {p['spath']}") else: md5sum = compute_md5_checksum(p["spath"]) with md5_path.open("w", encoding="utf-8") as f: f.write(f"{md5sum} {p['spath'].name}") - try: - obj = irods_session.data_objects.get(path_dest) - if not obj.checksum and self.args.remote_checksums: - obj.checksum = obj.chksum() - if obj.checksum == md5sum: - logger.info( - f"File {Path(path_dest).name} already exists in iRODS with matching checksum. Skipping upload." - ) - continue - except DataObjectDoesNotExist: # pragma: no cover - pass - finally: - irods_session.cleanup() - transfer_jobs.append( TransferJob( - path_src=str(p["spath"]), - path_dest=path_dest, - bytes=p["spath"].stat().st_size, + path_local=str(p["spath"]), + path_remote=path_remote, ) ) transfer_jobs.append( TransferJob( - path_src=str(md5_path), - path_dest=path_dest + ".md5", - bytes=md5_path.stat().st_size, + path_local=str(md5_path), + path_remote=path_remote + ".md5", ) ) - return set(sorted(transfer_jobs)) + return set(transfer_jobs) def setup_argparse(parser: argparse.ArgumentParser) -> None: diff --git a/tests/test_irods_utils.py b/tests/test_irods_utils.py deleted file mode 100644 index 136a516c..00000000 --- a/tests/test_irods_utils.py +++ /dev/null @@ -1,117 +0,0 @@ -from pathlib import Path -import shutil -from unittest.mock import MagicMock, PropertyMock, patch - -import irods.exception -from irods.session import iRODSSession -import pytest - -from cubi_tk.irods_utils import ( - TransferJob, - get_irods_error, - init_irods, - iRODSTransfer, - save_irods_token, -) - - -@pytest.fixture -def fake_filesystem(fs): - yield fs - - -@pytest.fixture -def jobs(): - return ( - TransferJob( - path_src="myfile.csv", - path_dest="dest_dir/myfile.csv", - bytes=123, - md5="ed3b3cbb18fd148bc925944ff0861ce6", - ), - TransferJob( - path_src="folder/file.csv", - path_dest="dest_dir/folder/file.csv", - bytes=1024, - md5="a6e9e3c859b803adb0f1d5f08a51d0f6", - ), - ) - - -@pytest.fixture -def itransfer(jobs): - session = iRODSSession( - irods_host="localhost", - irods_port=1247, - irods_user_name="pytest", - irods_zone_name="pytest", - ) - return iRODSTransfer(session, jobs) - - -def test_get_irods_error(): - e = irods.exception.NetworkException() - assert get_irods_error(e) == "NetworkException" - e = irods.exception.NetworkException("Connection reset") - assert get_irods_error(e) == "Connection reset" - - -@patch("cubi_tk.irods_utils.iRODSSession") -@patch("getpass.getpass") -def test_init_irods(mockpass, mocksession, fs): - ienv = Path(".irods/irods_environment.json") - password = "1234" - - # .irodsA not found, asks for password - mockpass.return_value = password - init_irods(ienv) - mockpass.assert_called() - mocksession.assert_called_with(irods_env_file=ienv, password=password) - - # .irodsA there, does not ask for password - fs.create_file(".irods/.irodsA") - mockpass.reset_mock() - init_irods(ienv) - mockpass.assert_not_called() - mocksession.assert_called_with(irods_env_file=ienv) - - -@patch("cubi_tk.irods_utils.encode", return_value="it works") -def test_write_token(mockencode, fs): - ienv = Path(".irods/irods_environment.json") - - mocksession = MagicMock() - pam_pw = PropertyMock(return_value=["secure"]) - type(mocksession).pam_pw_negotiated = pam_pw - - save_irods_token(mocksession, ienv) - assert ienv.parent.joinpath(".irodsA").exists() - mockencode.assert_called_with("secure") - - -def test_irods_transfer_init(jobs, itransfer): - assert itransfer.total_bytes == sum([job.bytes for job in jobs]) - assert itransfer.destinations == [job.path_dest for job in jobs] - - -def test_irods_transfer_put(fs, itransfer, jobs): - for job in jobs: - fs.create_file(job.path_src) - fs.create_dir(Path(job.path_dest).parent) - - with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy): - itransfer.put() - - for job in jobs: - assert Path(job.path_dest).exists() - assert Path(job.path_dest + ".md5").exists() - with Path(job.path_dest + ".md5").open() as file: - assert file.readline() == f"{job.md5} {Path(job.path_dest).name}" - - -def test_irods_transfer_chksum(itransfer): - with patch.object(itransfer.session.data_objects, "get") as mock: - itransfer.chksum() - - for path in itransfer.destinations: - mock.assert_any_call(path) diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index bc5d05d2..23e2152f 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -36,11 +36,6 @@ def test_run_sodar_ingest_nothing(capsys): assert res.err -@pytest.fixture -def fake_filesystem(fs): - yield fs - - @pytest.fixture def ingest(fs): fs.create_dir(Path.home().joinpath(".irods")) @@ -127,87 +122,89 @@ class DummyArgs(object): assert {"spath": Path("file5"), "ipath": Path("file5")} not in paths -@patch("cubi_tk.sodar.ingest.sorted") -@patch("cubi_tk.sodar.ingest.compute_md5_checksum", return_value="5555") -@patch("pathlib.Path.stat") @patch("cubi_tk.sodar.ingest.TransferJob") -def test_sodar_ingest_build_jobs(mockjob, mockstats, mockmd5, mocksorted, ingest): +def test_sodar_ingest_build_jobs(mockjob, ingest, fs): paths = [ {"spath": Path("myfile.csv"), "ipath": Path("dest_dir/myfile.csv")}, {"spath": Path("folder/file.csv"), "ipath": Path("dest_dir/folder/file.csv")}, ] - mockstats().st_size = 1024 + for path in paths: + fs.create_file(path["spath"]) + fs.create_file("myfile.csv.md5") ingest.build_jobs(paths) + for p in paths: mockjob.assert_any_call( - path_src=str(p["spath"]), - path_dest=f"{ingest.lz_irods_path}/{ingest.target_coll}/{str(p['ipath'])}", - bytes=1024, - md5="5555", + path_local=str(p["spath"]), + path_remote=f"{ingest.lz_irods_path}/{ingest.target_coll}/{str(p['ipath'])}", ) - - -@patch("cubi_tk.sodar.ingest.init_irods") -@patch("cubi_tk.sodar.ingest.api.landingzone.retrieve") -def test_sodar_ingest_smoketest(mockapi, mocksession, fs): - class DummyAPI(object): - pass - - class DummyColl(object): - pass - - fs.create_dir(Path.home().joinpath(".irods")) - fs.create_file(Path.home().joinpath(".irods", "irods_environment.json")) - - fs.create_dir("/source/subdir") - fs.create_dir("/target/coll/") - fs.create_file("/source/file1") - fs.create_file("/source/subdir/file2") - lz_uuid = "f46b4fc3-0927-449d-b725-9ffed231507b" - argv = [ - "sodar", - "ingest", - "--sodar-url", - "sodar_url", - "--sodar-api-token", - "token", - "--collection", - "coll", - "--yes", - "--recursive", - "source", - lz_uuid, - ] - - # Test cancel no invalid LZ - api_return = DummyAPI() - api_return.status = "DELETED" - api_return.irods_path = "target" - mockapi.return_value = api_return - - with pytest.raises(SystemExit): - main(argv) - mockapi.assert_called_with( - sodar_url="sodar_url", sodar_api_token="token", landingzone_uuid=lz_uuid + mockjob.assert_any_call( + path_local=str(p["spath"]) + ".md5", + path_remote=f"{ingest.lz_irods_path}/{ingest.target_coll}/{str(p['ipath']) + '.md5'}", ) - # Test calls when LZ is active - api_return.status = "ACTIVE" - dcoll = DummyColl() - dcoll.subcollections = [ - DummyColl(), - ] - dcoll.subcollections[0].name = "coll" - mocksession.return_value.collections.get.return_value = dcoll - mocksession.return_value.collections.create = MagicMock(wraps=os.mkdir) - - main(argv) - assert call().collections.get("target") in mocksession.mock_calls - assert call().collections.create("target/coll/subdir") in mocksession.mock_calls - - # TODO: more assertions, but I don't know how to query this out of the mock... - # assert Path("/target/coll/file1").exists() - # assert Path("/target/coll/file1.md5").exists() - assert Path("/target/coll/subdir/").exists() - # assert Path("/target/coll/subdir/file2.md5").exists() + +# @patch("cubi_tk.sodar.ingest.init_irods") +# @patch("cubi_tk.sodar.ingest.api.landingzone.retrieve") +# def test_sodar_ingest_smoketest(mockapi, mocksession, fs): +# class DummyAPI(object): +# pass + +# class DummyColl(object): +# pass + +# fs.create_dir(Path.home().joinpath(".irods")) +# fs.create_file(Path.home().joinpath(".irods", "irods_environment.json")) + +# fs.create_dir("/source/subdir") +# fs.create_dir("/target/coll/") +# fs.create_file("/source/file1") +# fs.create_file("/source/subdir/file2") +# lz_uuid = "f46b4fc3-0927-449d-b725-9ffed231507b" +# argv = [ +# "sodar", +# "ingest", +# "--sodar-url", +# "sodar_url", +# "--sodar-api-token", +# "token", +# "--collection", +# "coll", +# "--yes", +# "--recursive", +# "source", +# lz_uuid, +# ] + +# # Test cancel no invalid LZ +# api_return = DummyAPI() +# api_return.status = "DELETED" +# api_return.irods_path = "target" +# mockapi.return_value = api_return + +# with pytest.raises(SystemExit): +# main(argv) +# mockapi.assert_called_with( +# sodar_url="sodar_url", sodar_api_token="token", landingzone_uuid=lz_uuid +# ) + +# # Test calls when LZ is active +# api_return.status = "ACTIVE" +# dcoll = DummyColl() +# dcoll.subcollections = [ +# DummyColl(), +# ] +# dcoll.subcollections[0].name = "coll" +# mocksession.return_value.collections.get.return_value = dcoll +# mocksession.return_value.collections.create = MagicMock(wraps=os.mkdir) + +# main(argv) +# assert call().collections.get("target") in mocksession.mock_calls +# assert call().collections.create("target/coll/subdir") in mocksession.mock_calls + +# # TODO: more assertions, but I don't know how to query this out of the mock... +# # assert Path("/target/coll/file1").exists() +# # assert Path("/target/coll/file1.md5").exists() +# assert Path("/target/coll/subdir/").exists() +# # assert Path("/target/coll/subdir/file2.md5").exists()