diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e813c37..889988c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,13 @@ Change Log ========== +3.11.0 +===== +* Most MetaWorkflowRun creation functions now accept multiple files or filesets as input. +* Added `archive_broad_crams_and_fastqs` function to `wrangler_utils` to archive Broad submissions more easily. +* Added `purge_fileset` function to `wrangler_utils` to delete a fileset and its associated files. +* Added `remove_property` function to `wrangler_utils` to remove properties from items. + 3.10.0 ===== * Added functionality to rerun a MetaWorkflowRun with imported steps from a previous run diff --git a/magma_smaht/commands/create_meta_workflow_run.py b/magma_smaht/commands/create_meta_workflow_run.py index 986df09..14b1696 100644 --- a/magma_smaht/commands/create_meta_workflow_run.py +++ b/magma_smaht/commands/create_meta_workflow_run.py @@ -357,7 +357,12 @@ def qc_long_read_bam(file_accessions, replace_qc, auth_env): @cli.command() @click.help_option("--help", "-h") @click.option( - "-f", "--fileset-accession", required=True, type=str, help="Fileset accession" + "-f", + "--fileset-accessions", + required=True, + multiple=True, + type=str, + help="Fileset accession(s)", ) @click.option( "-e", @@ -366,16 +371,23 @@ def qc_long_read_bam(file_accessions, replace_qc, auth_env): type=str, help="Name of environment in smaht-keys file", ) -def conversion_cram_to_fastq(fileset_accession, auth_env): +def conversion_cram_to_fastq(fileset_accessions, auth_env): """Conversion MWFR for CRAM to FASTQ (paired-end)""" smaht_key = get_auth_key(auth_env) - mwfr_cram_to_fastq_paired_end(fileset_accession, smaht_key) + for fileset_accession in fileset_accessions: + print(f"Working on FileSet {fileset_accession}") + mwfr_cram_to_fastq_paired_end(fileset_accession, smaht_key) @cli.command() @click.help_option("--help", "-h") @click.option( - "-f", "--fileset-accession", required=True, type=str, help="Fileset accession" + "-f", + "--fileset-accessions", + required=True, + multiple=True, + type=str, + help="Fileset accession(s)", ) @click.option( "-e", @@ -384,10 +396,12 @@ def conversion_cram_to_fastq(fileset_accession, auth_env): type=str, help="Name of environment in smaht-keys file", ) -def conversion_bam_to_fastq(fileset_accession, auth_env): +def conversion_bam_to_fastq(fileset_accessions, auth_env): """Conversion MWFR for BAM to FASTQ (paired-end)""" smaht_key = get_auth_key(auth_env) - mwfr_bam_to_fastq_paired_end(fileset_accession, smaht_key) + for fileset_accession in fileset_accessions: + print(f"Working on FileSet {fileset_accession}") + mwfr_bam_to_fastq_paired_end(fileset_accession, smaht_key) @cli.command() diff --git a/magma_smaht/commands/wrangler_utils.py b/magma_smaht/commands/wrangler_utils.py index d97e435..89d9236 100644 --- a/magma_smaht/commands/wrangler_utils.py +++ b/magma_smaht/commands/wrangler_utils.py @@ -87,10 +87,17 @@ def reset_mwfrs(mwfr_uuids, auth_env): type=str, help="Name of environment in smaht-keys file", ) -def reset_all_failed_mwfrs(auth_env): +@click.option( + "--ignore-md5", + default=False, + is_flag=True, + show_default=True, + help="Ignore MD5 checksum runs", +) +def reset_all_failed_mwfrs(auth_env, ignore_md5): """Reset all failed MetaWorkflowRuns on the portal""" smaht_key = get_auth_key(auth_env) - wrangler_utils.reset_all_failed_mwfrs(smaht_key) + wrangler_utils.reset_all_failed_mwfrs(smaht_key, ignore_md5) @cli.command() @@ -270,6 +277,42 @@ def archive_unaligned_reads(fileset_accessions, dry_run, auth_env): wrangler_utils.archive_unaligned_reads(f, dry_run, smaht_key) +@cli.command() +@click.help_option("--help", "-h") +@click.option( + "-f", + "--fileset-accessions", + required=True, + type=str, + multiple=True, + help="Fileset accessions", +) +@click.option( + "-d", + "--dry-run", + default=False, + is_flag=True, + show_default=True, + help="Dry run", +) +@click.option( + "-e", + "--auth-env", + required=True, + type=str, + help="Name of environment in smaht-keys file", +) +def archive_broad_crams_and_fastqs(fileset_accessions, dry_run, auth_env): + """ + Archive (submitted) CRAMs and the associated FASTQs from the conversion. + Every such file in the fileset will receive the s3_lifecycle_category=long_term_archive. + """ + smaht_key = get_auth_key(auth_env) + for f in fileset_accessions: + print(f"Working on Fileset {f}") + wrangler_utils.archive_broad_crams_and_fastqs(f, dry_run, smaht_key) + + @cli.command() @click.help_option("--help", "-h") @click.option( @@ -294,6 +337,64 @@ def sample_identity_check_status(num_files, auth_env): wrangler_utils.sample_identity_check_status(num_files, smaht_key) +@cli.command() +@click.help_option("--help", "-h") +@click.option( + "-f", + "--fileset-accessions", + required=True, + type=str, + multiple=True, + help="Fileset accessions", +) +@click.option( + "-d", + "--dry-run", + default=False, + is_flag=True, + show_default=True, + help="Dry run", +) +@click.option( + "--delete-submitted-files", + default=False, + is_flag=True, + show_default=True, + help="Delete submitted files", +) +@click.option( + "--delete-fileset", + default=False, + is_flag=True, + show_default=True, + help="Delete fileset", +) +@click.option( + "-e", + "--auth-env", + required=True, + type=str, + help="Name of environment in smaht-keys file", +) +def purge_fileset( + fileset_accessions, dry_run, delete_submitted_files, delete_fileset, auth_env +): + """ + Delete all files in a fileset, delete associated MetaWorkflowRuns (incl. files) + and remove the fileset from the portal. Use with caution! + """ + smaht_key = get_auth_key(auth_env) + for fileset_accession in fileset_accessions: + print(f"Working on Fileset {fileset_accession}") + wrangler_utils.purge_fileset( + fileset_accession, + dry_run, + delete_submitted_files, + delete_fileset, + smaht_key, + ) + + @cli.command() @click.help_option("--help", "-h") @click.option( @@ -330,5 +431,34 @@ def set_property(identifier, property_key, property_value, auth_env): wrangler_utils.set_property(identifier, property_key, property_value, smaht_key) +@cli.command() +@click.help_option("--help", "-h") +@click.option( + "-i", + "--identifier", + required=True, + type=str, + help="Item UUID", +) +@click.option( + "-p", + "--property", + required=True, + type=str, + help="Item property", +) +@click.option( + "-e", + "--auth-env", + required=True, + type=str, + help="Name of environment in smaht-keys file", +) +def remove_property(identifier, property, auth_env): + """Remove item property by uuid.""" + smaht_key = get_auth_key(auth_env) + wrangler_utils.remove_property(identifier, property, smaht_key) + + if __name__ == "__main__": cli() diff --git a/magma_smaht/constants.py b/magma_smaht/constants.py index 9740507..789cee3 100644 --- a/magma_smaht/constants.py +++ b/magma_smaht/constants.py @@ -32,7 +32,6 @@ LENGTH_REQUIRED = "length_required" LIBRARY_ID = "library_id" GENOME_REFERENCE_STAR = "genome_reference_star" -IS_STRANDED = "is_stranded" STRANDEDNESS = "strandedness" # Schema fields @@ -47,6 +46,7 @@ FILE_SETS = "file_sets" META_WORFLOW_RUN = "MetaWorkflowRun" ACCESSION = "accession" +DISPLAY_TITLE = "display_title" ALIASES = "aliases" UPLOADED = "uploaded" DELETED = "deleted" diff --git a/magma_smaht/create_metawfr.py b/magma_smaht/create_metawfr.py index 17a9172..35d3741 100644 --- a/magma_smaht/create_metawfr.py +++ b/magma_smaht/create_metawfr.py @@ -59,7 +59,6 @@ LENGTH_REQUIRED, LIBRARY_ID, GENOME_REFERENCE_STAR, - IS_STRANDED, STRANDEDNESS, COMMON_FIELDS, UUID, @@ -135,7 +134,6 @@ def mwfr_rnaseq_alignment(fileset_accession, sequence_length, smaht_key): strandedness_mapping = {FIRST_STRANDED: "rf", SECOND_STRANDED: "fr"} mwfr_input.extend( [ - get_mwfr_parameter_input_arg(IS_STRANDED, "true"), get_mwfr_parameter_input_arg( STRANDEDNESS, strandedness_mapping[strand] ), diff --git a/magma_smaht/wrangler_utils.py b/magma_smaht/wrangler_utils.py index ab61686..ed8917c 100644 --- a/magma_smaht/wrangler_utils.py +++ b/magma_smaht/wrangler_utils.py @@ -26,6 +26,7 @@ COMPLETED, UUID, ACCESSION, + DISPLAY_TITLE, COMMON_FIELDS, FILE_SETS, CONSORTIA, @@ -35,7 +36,7 @@ TAGS, STATUS, DELETED, - FAILED_JOBS + FAILED_JOBS, ) JsonObject = Dict[str, Any] @@ -144,8 +145,13 @@ def reset_mwfrs(mwfr_uuids: list, smaht_key: dict): reset_all(id, smaht_key) -def reset_all_failed_mwfrs(smaht_key: dict): - url = "/search/?final_status=failed&type=MetaWorkflowRun" +def reset_all_failed_mwfrs(smaht_key: dict, ignore_md5 : bool): + + url = ( + "/search/?final_status=failed&type=MetaWorkflowRun" + if not ignore_md5 + else "/search/?final_status=failed&type=MetaWorkflowRun&meta_workflow.name%21=md5" + ) results = ff_utils.search_metadata(url, key=smaht_key) for item in results: print(f"Reset MetaWorkflowRun {item['uuid']}") @@ -185,7 +191,13 @@ def rerun_mwfr( input = mwfr_old_raw["input"] mwfr = mwfr_from_input(mwf[UUID], input, input_arg, smaht_key) - props_to_copy = [COMMON_FIELDS, CONSORTIA, FILE_SETS, SUBMISSION_CENTERS, FAILED_JOBS] + props_to_copy = [ + COMMON_FIELDS, + CONSORTIA, + FILE_SETS, + SUBMISSION_CENTERS, + FAILED_JOBS, + ] for prop in props_to_copy: if prop in mwfr_old_raw: mwfr[prop] = mwfr_old_raw[prop] @@ -211,8 +223,8 @@ def rerun_mwfr( ) mwfr[DESCRIPTION] = f"Rerun of MetaWorkflow {mwfr_old_raw[ACCESSION]}" - #mwfr["final_status"] = "stopped" - # pprint.pprint(mwfr) + # mwfr["final_status"] = "stopped" + post_response = ff_utils.post_metadata(mwfr, META_WORFLOW_RUN, smaht_key) mwfr_accession = post_response["@graph"][0]["accession"] print(f"Posted MetaWorkflowRun {mwfr_accession}.") @@ -221,7 +233,7 @@ def rerun_mwfr( patch_body = { DESCRIPTION: f"This MetaWorkflowRun has been rerun due to an updated MetaWorkflow as {mwfr_accession}", TAGS: ["rerun"], - STATUS: DELETED + STATUS: DELETED, } ff_utils.patch_metadata(patch_body, obj_id=mwfr_old_raw[UUID], key=smaht_key) print(f"Tagged old MWFR {mwfr_old_raw[ACCESSION]} as rerun.") @@ -277,14 +289,16 @@ def merge_qc_items(file_accession: str, mode: str, smaht_key: dict): print("Merging done.") -def replace_qc_item(file_accession: str, keep_index: int, release: bool, smaht_key:dict): +def replace_qc_item( + file_accession: str, keep_index: int, release: bool, smaht_key: dict +): file = ff_utils.get_metadata(file_accession, smaht_key) file_uuid = file["uuid"] file_qms = file.get("quality_metrics", []) if len(file_qms) < 2 or keep_index > len(file_qms): print(f"ERROR: Not enough QM items present.") return - + qm_uuid_to_keep = file_qms[keep_index][UUID] try: patch_body = {"quality_metrics": [qm_uuid_to_keep]} @@ -292,7 +306,7 @@ def replace_qc_item(file_accession: str, keep_index: int, release: bool, smaht_k print("QC item replaced.") except Exception as e: raise Exception(f"Item could not be PATCHed: {str(e)}") - + if release: qm_item = ff_utils.get_metadata(qm_uuid_to_keep, smaht_key) zip_url = qm_item["url"] @@ -305,15 +319,15 @@ def replace_qc_item(file_accession: str, keep_index: int, release: bool, smaht_k print(f"Zip file {zip_uuid} released.") except Exception as e: raise Exception(f"Item could not be PATCHed: {str(e)}") - + def archive_unaligned_reads(fileset_accession: str, dry_run: bool, smaht_key: dict): """Archive (submitted) unaligned reads of a fileset. - Every submitted unaligned read in the fileset will receive the s3_lifecycle_categor=long_term_archive. + Every submitted unaligned read in the fileset will receive the s3_lifecycle_category=long_term_archive. Args: - fileset_accession (str): _description_ - smaht_key (dict): _description_ + fileset_accession (str): Fileset accession + smaht_key (dict): SMaHT key """ file_set = get_file_set(fileset_accession, smaht_key) @@ -337,6 +351,50 @@ def archive_unaligned_reads(fileset_accession: str, dry_run: bool, smaht_key: di print(f" - Archived file {unaligned_read['display_title']}") +def archive_broad_crams_and_fastqs(fileset_accession: str, dry_run: bool, smaht_key: dict): + """Archive (submitted) CRAMs and the associated FASTQs from the conversion. + Every such file in the fileset will receive the s3_lifecycle_category=long_term_archive. + + Args: + fileset_accession (str): Fileset accession + smaht_key (dict): SMaHT key + """ + file_set = get_file_set(fileset_accession, smaht_key) + + search_filter = ( + "?type=SubmittedFile" + "&file_format.display_title=cram" + "&status=uploaded" + f"&file_sets.uuid={file_set[UUID]}" + ) + crams = ff_utils.search_metadata( + f"/search/{search_filter}", key=smaht_key + ) + + search_filter = ( + "?type=OutputFile" + "&file_format.display_title=fastq_gz" + "&status=uploaded" + f"&file_sets.uuid={file_set[UUID]}" + ) + fastqs = ff_utils.search_metadata( + f"/search/{search_filter}", key=smaht_key + ) + files = crams + fastqs + if dry_run: + print(f" - Patching {len(files)} files. DRY RUN - NOTHING PATCHED") + else: + print(f" - Patching {len(files)} files") + + for file in files: + if not dry_run: + patch_body = {"s3_lifecycle_category": "long_term_archive"} + ff_utils.patch_metadata( + patch_body, obj_id=file[UUID], key=smaht_key + ) + print(f" - Archived file {file['display_title']}") + + def sample_identity_check_status(num_files: int, smaht_key: dict): """Check output files that are not input of sample_identity_check metaworkflow runs. @@ -376,6 +434,7 @@ def sample_identity_check_status(num_files: int, smaht_key: dict): donors = {} status = {} for output in output_files: + print(f"Checking file {output['display_title']}") mwfrs = output.get("meta_workflow_run_outputs") if not mwfrs: print( @@ -444,6 +503,80 @@ def sample_identity_check_status(num_files: int, smaht_key: dict): ) +def purge_fileset( + fileset_accession: str, + dry_run: bool, + delete_submitted_files: bool, + delete_fileset: bool, + smaht_key: dict, +): + """Delete all files in a fileset, delete associated MetaWorkflowRuns (incl. files) + and remove the fileset from the portal. + + Args: + fileset_accession (str): Fileset accession + dry_run (bool): If True, do not delete files but print what would be deleted + delete_submitted_files (bool): If True, delete submitted files + delete_fileset (bool): If True, delete the fileset + smaht_key (dict): SMaHT key + """ + file_set = get_item(fileset_accession, smaht_key, frame="embedded") + if not file_set: + print(f"Fileset {fileset_accession} not found.") + return + + items_to_delete = [] + + if delete_submitted_files: + print(f"\nFiles on the file set that will be deleted:") + for file in file_set.get("files", []): + print(f" - {file[DISPLAY_TITLE]}") + items_to_delete.append(file[UUID]) + + print(f"\nMetaWorkflowRuns that will be deleted:") + for mwfr in file_set.get("meta_workflow_runs", []): + print(f" - MetaWorkflowRun {mwfr[DISPLAY_TITLE]}") + mwfr_item = get_item(mwfr[UUID], smaht_key, frame="embedded") + items_to_delete.append(mwfr_item[UUID]) + + workflow_runs = mwfr_item.get("workflow_runs", []) + for wfr in workflow_runs: + wfr_display_title = wfr["workflow_run"][DISPLAY_TITLE] + wfr_uuid = wfr["workflow_run"][UUID] + print(f" - WorkflowRun {wfr_display_title}") + items_to_delete.append(wfr_uuid) + outputs = wfr.get("output", []) + for output in outputs: + file = output.get("file", {}) + if file and file.get(STATUS) != DELETED: + print(f" - File {file[DISPLAY_TITLE]}") + items_to_delete.append(file[UUID]) + + if delete_fileset: + print(f"\nFile set that will be deleted:") + print(f" - {file_set[ACCESSION]}") + items_to_delete.append(file_set[UUID]) + + if dry_run: + print(f"\nDRY RUN: Nothing deleted.") + return + else: + confirm = input("Are you sure you want to continue? [y/N]: ").strip().lower() + + if confirm != 'y': + print(f"Aborted deletion of Fileset {file_set[ACCESSION]} and its files.") + return + + for item_uuid in items_to_delete: + try: + ff_utils.patch_metadata( + {STATUS: DELETED}, obj_id=item_uuid, key=smaht_key + ) + print(f"Deleted item {item_uuid}.") + except Exception as e: + print(f"Error deleting item {item_uuid}: {str(e)}") + + def print_error_and_exit(error): print(error) exit() @@ -457,3 +590,12 @@ def set_property(uuid: str, prop_key: str, prop_value: Any, smaht_key: Dict[str, print(f"Set item {uuid} property {prop_key} to {prop_value}.") except Exception as e: raise Exception(f"Item could not be PATCHed: {str(e)}") + + +def remove_property(uuid: str, property: str, smaht_key: Dict[str, Any]): + """ "Removes a property from an item.""" + try: + ff_utils.patch_metadata({}, obj_id=f"{uuid}?delete_fields={property}", key=smaht_key) + print(f"Removed property {property} from item {uuid}.") + except Exception as e: + raise Exception(f"Item could not be PATCHed: {str(e)}") diff --git a/pyproject.toml b/pyproject.toml index 058477d..7d07690 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "magma-suite" -version = "3.10.0" +version = "3.11.0" description = "Collection of tools to manage meta-workflows automation." authors = ["Michele Berselli ", "Doug Rioux", "Soo Lee", "CGAP team"] license = "MIT"